2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 7/11/2007 Sapan Bhatia <sapanb@cs.princeton.edu>
11 Added data collection (-f) functionality, xid support in the header and log file
17 /* stdout, stderr, freopen() */
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
34 #include <sys/statfs.h>
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
40 struct sockaddr_nl local;
41 struct sockaddr_nl peer;
42 struct nlmsghdr* last_nlhdr;
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
57 #include <sys/param.h>
82 #include <sys/select.h>
88 #include <fprobe-ulog.h>
90 #include <my_getopt.h>
123 static struct getopt_parms parms[] = {
124 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 extern int optind, opterr, optopt;
154 extern struct NetFlow NetFlow1;
155 extern struct NetFlow NetFlow5;
156 extern struct NetFlow NetFlow7;
158 #define START_VALUE -5
159 #define mark_is_tos parms[Mflag].count
160 static unsigned scan_interval = 5;
161 static int min_free = 0;
162 static int frag_lifetime = 30;
163 static int inactive_lifetime = 60;
164 static int active_lifetime = 300;
165 static int sockbufsize;
166 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
167 #if (MEM_BITS == 0) || (MEM_BITS == 16)
168 #define BULK_QUANTITY 10000
170 #define BULK_QUANTITY 200
173 static unsigned epoch_length=60, log_epochs=1;
174 static unsigned cur_epoch=0,prev_uptime=0;
176 static unsigned bulk_quantity = BULK_QUANTITY;
177 static unsigned pending_queue_length = 100;
178 static struct NetFlow *netflow = &NetFlow5;
179 static unsigned verbosity = 6;
180 static unsigned log_dest = MY_LOG_SYSLOG;
181 static struct Time start_time;
182 static long start_time_offset;
185 extern unsigned total_elements;
186 extern unsigned free_elements;
187 extern unsigned total_memory;
188 #if ((DEBUG) & DEBUG_I)
189 static unsigned emit_pkts, emit_queue;
190 static uint64_t size_total;
191 static unsigned pkts_total, pkts_total_fragmented;
192 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
193 static unsigned pkts_pending, pkts_pending_done;
194 static unsigned pending_queue_trace, pending_queue_trace_candidate;
195 static unsigned flows_total, flows_fragmented;
197 static unsigned emit_count;
198 static uint32_t emit_sequence;
199 static unsigned emit_rate_bytes, emit_rate_delay;
200 static struct Time emit_time;
201 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
202 static pthread_t thid;
203 static sigset_t sig_mask;
204 static struct sched_param schedp;
205 static int sched_min, sched_max;
206 static int npeers, npeers_rot;
207 static struct peer *peers;
210 static struct Flow *flows[1 << HASH_BITS];
211 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
213 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
216 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
217 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
218 static struct Flow *pending_head, *pending_tail;
219 static struct Flow *scan_frag_dreg;
221 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
222 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
223 static struct Flow *flows_emit;
225 static char ident[256] = "fprobe-ulog";
226 static FILE *pidfile;
227 static char *pidfilepath;
230 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
231 static struct ipulog_handle *ulog_handle;
232 static uint32_t ulog_gmask = 1;
233 static char *cap_buf;
234 static int nsnmp_rules;
235 static struct snmp_rule *snmp_rules;
236 static struct passwd *pw = 0;
241 "fprobe-ulog: a NetFlow probe. Version %s\n"
242 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
244 "-h\t\tDisplay this help\n"
245 "-U <mask>\tULOG group bitwise mask [1]\n"
246 "-s <seconds>\tHow often scan for expired flows [5]\n"
247 "-g <seconds>\tFragmented flow lifetime [30]\n"
248 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
249 "-f <filename>\tLog flow data in a file\n"
250 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
251 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
252 "-a <address>\tUse <address> as source for NetFlow flow\n"
253 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
254 "-M\t\tUse netfilter mark value as ToS flag\n"
255 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
256 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
257 "-q <flows>\tPending queue length [100]\n"
258 "-B <kilobytes>\tKernel capture buffer size [0]\n"
259 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
260 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
261 "-c <directory>\tDirectory to chroot to\n"
262 "-u <user>\tUser to run as\n"
263 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
264 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
265 "-y <remote:port>\tAddress of the NetFlow collector\n"
266 "-f <writable file>\tFile to write data into\n"
267 "-T <n>\tRotate log file every n epochs\n"
268 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
269 "-E <[1..60]>\tSize of an epoch in minutes\n"
270 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
272 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
276 #if ((DEBUG) & DEBUG_I)
279 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
280 pkts_total, pkts_total_fragmented, size_total,
281 pkts_pending - pkts_pending_done, pending_queue_trace);
282 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
283 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
284 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
285 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
286 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
287 total_elements, free_elements, total_memory);
291 void sighandler(int sig)
295 sigs |= SIGTERM_MASK;
297 #if ((DEBUG) & DEBUG_I)
299 sigs |= SIGUSR1_MASK;
305 void gettime(struct Time *now)
311 now->usec = t.tv_usec;
315 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
317 return (t1->sec - t2->sec)/60;
320 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
322 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
325 /* Uptime in miliseconds */
326 uint32_t getuptime(struct Time *t)
328 /* Maximum uptime is about 49/2 days */
329 return cmpmtime(t, &start_time);
332 /* Uptime in minutes */
333 uint32_t getuptime_minutes(struct Time *t)
335 /* Maximum uptime is about 49/2 days */
336 return cmpMtime(t, &start_time);
339 hash_t hash_flow(struct Flow *flow)
341 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
342 else return hash(flow, sizeof(struct Flow_TL));
345 uint16_t snmp_index(char *name) {
348 if (!*name) return 0;
350 for (i = 0; (int) i < nsnmp_rules; i++) {
351 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
352 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
355 if ((i = if_nametoindex(name))) return i;
360 inline void copy_flow(struct Flow *src, struct Flow *dst)
367 dst->proto = src->proto;
368 dst->tcp_flags = src->tcp_flags;
372 dst->pkts = src->pkts;
373 dst->size = src->size;
374 dst->sizeF = src->sizeF;
375 dst->sizeP = src->sizeP;
376 dst->ctime = src->ctime;
377 dst->mtime = src->mtime;
378 dst->flags = src->flags;
381 void get_cur_epoch() {
383 fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
387 sscanf(snum,"%d",&cur_epoch);
393 void update_cur_epoch_file(int n) {
396 len=snprintf(snum,6,"%d",n);
397 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
399 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
402 write(fd, snum, len);
406 unsigned get_log_fd(char *fname, unsigned cur_fd) {
409 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
410 * doesn't solve the problem */
412 struct statfs statfs;
415 cur_uptime = getuptime_minutes(&now);
417 if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
418 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
421 if (min_free && statfs.f_bfree < min_free)
423 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
424 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
427 my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
432 /* Epoch length in minutes */
433 if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
434 char nextname[MAX_PATH_LEN];
436 prev_uptime = cur_uptime;
437 cur_epoch = (cur_epoch + 1) % log_epochs;
439 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
440 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
441 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
444 update_cur_epoch_file(cur_epoch);
452 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
454 struct Flow **flowpp;
460 if (prev) flowpp = *prev;
463 if (where->sip.s_addr == what->sip.s_addr
464 && where->dip.s_addr == what->dip.s_addr
465 && where->proto == what->proto) {
466 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
468 /* Both unfragmented */
469 if ((what->sp == where->sp)
470 && (what->dp == where->dp)) goto done;
473 /* Both fragmented */
474 if (where->id == what->id) goto done;
478 flowpp = &where->next;
482 if (prev) *prev = flowpp;
486 int put_into(struct Flow *flow, int flag
487 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
494 struct Flow *flown, **flowpp;
495 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
500 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
501 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
504 pthread_mutex_lock(&flows_mutex[h]);
506 if (!(flown = find(flows[h], flow, &flowpp))) {
507 /* No suitable flow found - add */
508 if (flag == COPY_INTO) {
509 if ((flown = mem_alloc())) {
510 copy_flow(flow, flown);
513 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
514 my_log(LOG_ERR, "%s %s. %s",
515 "mem_alloc():", strerror(errno), "packet lost");
520 flow->next = flows[h];
522 #if ((DEBUG) & DEBUG_I)
524 if (flow->flags & FLOW_FRAG) flows_fragmented++;
526 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
528 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
533 /* Found suitable flow - update */
534 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
535 sprintf(buf, " +> %x", (unsigned) flown);
538 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
539 flown->mtime = flow->mtime;
540 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
541 flown->ctime = flow->ctime;
542 flown->tcp_flags |= flow->tcp_flags;
543 flown->size += flow->size;
544 flown->pkts += flow->pkts;
545 if (flow->flags & FLOW_FRAG) {
546 /* Fragmented flow require some additional work */
547 if (flow->flags & FLOW_TL) {
550 Several packets with FLOW_TL (attack)
552 flown->sp = flow->sp;
553 flown->dp = flow->dp;
555 if (flow->flags & FLOW_LASTFRAG) {
558 Several packets with FLOW_LASTFRAG (attack)
560 flown->sizeP = flow->sizeP;
562 flown->flags |= flow->flags;
563 flown->sizeF += flow->sizeF;
564 if ((flown->flags & FLOW_LASTFRAG)
565 && (flown->sizeF >= flown->sizeP)) {
566 /* All fragments received - flow reassembled */
567 *flowpp = flown->next;
568 pthread_mutex_unlock(&flows_mutex[h]);
569 #if ((DEBUG) & DEBUG_I)
574 flown->flags &= ~FLOW_FRAG;
575 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
578 ret = put_into(flown, MOVE_INTO
579 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
585 if (flag == MOVE_INTO) mem_free(flow);
587 pthread_mutex_unlock(&flows_mutex[h]);
591 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
595 for (i = 0; i < fields; i++) {
596 #if ((DEBUG) & DEBUG_F)
597 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
600 case NETFLOW_IPV4_SRC_ADDR:
601 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
602 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
605 case NETFLOW_IPV4_DST_ADDR:
606 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
607 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
608 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
610 p += NETFLOW_IPV4_DST_ADDR_SIZE;
613 case NETFLOW_INPUT_SNMP:
614 *((uint16_t *) p) = htons(flow->iif);
615 p += NETFLOW_INPUT_SNMP_SIZE;
618 case NETFLOW_OUTPUT_SNMP:
619 *((uint16_t *) p) = htons(flow->oif);
620 p += NETFLOW_OUTPUT_SNMP_SIZE;
623 case NETFLOW_PKTS_32:
624 *((uint32_t *) p) = htonl(flow->pkts);
625 p += NETFLOW_PKTS_32_SIZE;
628 case NETFLOW_BYTES_32:
629 *((uint32_t *) p) = htonl(flow->size);
630 p += NETFLOW_BYTES_32_SIZE;
633 case NETFLOW_FIRST_SWITCHED:
634 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
635 p += NETFLOW_FIRST_SWITCHED_SIZE;
638 case NETFLOW_LAST_SWITCHED:
639 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
640 p += NETFLOW_LAST_SWITCHED_SIZE;
643 case NETFLOW_L4_SRC_PORT:
644 *((uint16_t *) p) = flow->sp;
645 p += NETFLOW_L4_SRC_PORT_SIZE;
648 case NETFLOW_L4_DST_PORT:
649 *((uint16_t *) p) = flow->dp;
650 p += NETFLOW_L4_DST_PORT_SIZE;
654 *((uint8_t *) p) = flow->proto;
655 p += NETFLOW_PROT_SIZE;
658 case NETFLOW_SRC_TOS:
659 *((uint8_t *) p) = flow->tos;
660 p += NETFLOW_SRC_TOS_SIZE;
663 case NETFLOW_TCP_FLAGS:
664 *((uint8_t *) p) = flow->tcp_flags;
665 p += NETFLOW_TCP_FLAGS_SIZE;
668 case NETFLOW_VERSION:
669 *((uint16_t *) p) = htons(netflow->Version);
670 p += NETFLOW_VERSION_SIZE;
674 *((uint16_t *) p) = htons(emit_count);
675 p += NETFLOW_COUNT_SIZE;
679 *((uint32_t *) p) = htonl(getuptime(&emit_time));
680 p += NETFLOW_UPTIME_SIZE;
683 case NETFLOW_UNIX_SECS:
684 *((uint32_t *) p) = htonl(emit_time.sec);
685 p += NETFLOW_UNIX_SECS_SIZE;
688 case NETFLOW_UNIX_NSECS:
689 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
690 p += NETFLOW_UNIX_NSECS_SIZE;
693 case NETFLOW_FLOW_SEQUENCE:
694 //*((uint32_t *) p) = htonl(emit_sequence);
695 *((uint32_t *) p) = 0;
696 p += NETFLOW_FLOW_SEQUENCE_SIZE;
700 /* Unsupported (uint8_t) */
701 case NETFLOW_ENGINE_TYPE:
702 case NETFLOW_ENGINE_ID:
703 case NETFLOW_FLAGS7_1:
704 case NETFLOW_SRC_MASK:
705 case NETFLOW_DST_MASK:
706 *((uint8_t *) p) = 0;
707 p += NETFLOW_PAD8_SIZE;
710 *((uint16_t *) p) = flow->tos;
711 p += NETFLOW_XID_SIZE;
714 /* Unsupported (uint16_t) */
717 case NETFLOW_FLAGS7_2:
718 *((uint16_t *) p) = 0;
719 p += NETFLOW_PAD16_SIZE;
723 /* Unsupported (uint32_t) */
724 case NETFLOW_IPV4_NEXT_HOP:
725 case NETFLOW_ROUTER_SC:
726 *((uint32_t *) p) = 0;
727 p += NETFLOW_PAD32_SIZE;
731 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
732 format, i, format[i]);
736 #if ((DEBUG) & DEBUG_F)
737 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
744 Workaround for clone()-based threads
745 Try to change EUID independently of main thread
749 setregid(pw->pw_gid, pw->pw_gid);
750 setreuid(pw->pw_uid, pw->pw_uid);
759 struct timespec timeout;
760 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
762 p = (void *) &emit_packet + netflow->HeaderSize;
768 pthread_mutex_lock(&emit_mutex);
769 while (!flows_emit) {
770 gettimeofday(&now, 0);
771 timeout.tv_sec = now.tv_sec + emit_timeout;
772 /* Do not wait until emit_packet will filled - it may be too long */
773 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
774 pthread_mutex_unlock(&emit_mutex);
779 flows_emit = flows_emit->next;
780 #if ((DEBUG) & DEBUG_I)
783 pthread_mutex_unlock(&emit_mutex);
787 gettime(&start_time);
788 start_time.sec -= start_time_offset;
791 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
795 printf("Emit count = %d\n", emit_count);
798 if (emit_count == netflow->MaxFlows) {
801 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
802 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
803 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
804 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
806 for (i = 0; i < npeers; i++) {
807 if (peers[i].type == PEER_FILE) {
808 if (netflow->SeqOffset)
809 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
810 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
811 ret = write(peers[i].write_fd, emit_packet, size);
814 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
815 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
816 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
820 #if ((DEBUG) & DEBUG_E)
822 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
823 emit_count, i + 1, peers[i].seq);
826 peers[i].seq += emit_count;
829 if (emit_rate_bytes) {
831 delay = sent / emit_rate_bytes;
833 sent %= emit_rate_bytes;
835 timeout.tv_nsec = emit_rate_delay * delay;
836 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
841 if (peers[i].type == PEER_MIRROR) goto sendreal;
843 if (peers[i].type == PEER_ROTATE)
844 if (peer_rot_cur++ == peer_rot_work) {
846 if (netflow->SeqOffset)
847 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
848 ret = send(peers[i].write_fd, emit_packet, size, 0);
850 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
851 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
852 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
855 #if ((DEBUG) & DEBUG_E)
857 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
858 emit_count, i + 1, peers[i].seq);
861 peers[i].seq += emit_count;
864 if (emit_rate_bytes) {
866 delay = sent / emit_rate_bytes;
868 sent %= emit_rate_bytes;
870 timeout.tv_nsec = emit_rate_delay * delay;
871 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
876 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
877 emit_sequence += emit_count;
879 #if ((DEBUG) & DEBUG_I)
886 void *unpending_thread()
889 struct timespec timeout;
890 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
897 pthread_mutex_lock(&unpending_mutex);
900 while (!(pending_tail->flags & FLOW_PENDING)) {
901 gettimeofday(&now, 0);
902 timeout.tv_sec = now.tv_sec + unpending_timeout;
903 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
906 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
909 if (put_into(pending_tail, COPY_INTO
910 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
914 #if ((DEBUG) & DEBUG_I)
915 pkts_lost_unpending++;
919 #if ((DEBUG) & DEBUG_U)
920 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
923 pending_tail->flags = 0;
924 pending_tail = pending_tail->next;
925 #if ((DEBUG) & DEBUG_I)
933 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
937 struct Flow *flow, **flowpp;
939 struct timespec timeout;
944 pthread_mutex_lock(&scan_mutex);
948 timeout.tv_sec = now.sec + scan_interval;
949 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
952 #if ((DEBUG) & DEBUG_S)
953 my_log(LOG_DEBUG, "S: %d", now.sec);
955 for (i = 0; i < 1 << HASH_BITS ; i++) {
956 pthread_mutex_lock(&flows_mutex[i]);
960 if (flow->flags & FLOW_FRAG) {
961 /* Process fragmented flow */
962 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
963 /* Fragmented flow expired - put it into special chain */
964 #if ((DEBUG) & DEBUG_I)
968 *flowpp = flow->next;
970 flow->flags &= ~FLOW_FRAG;
971 flow->next = scan_frag_dreg;
972 scan_frag_dreg = flow;
977 /* Flow is not frgamented */
978 if ((now.sec - flow->mtime.sec) > inactive_lifetime
979 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
981 #if ((DEBUG) & DEBUG_S)
982 my_log(LOG_DEBUG, "S: E %x", flow);
984 #if ((DEBUG) & DEBUG_I)
987 *flowpp = flow->next;
988 pthread_mutex_lock(&emit_mutex);
989 flow->next = flows_emit;
991 #if ((DEBUG) & DEBUG_I)
994 pthread_mutex_unlock(&emit_mutex);
999 flowpp = &flow->next;
1002 pthread_mutex_unlock(&flows_mutex[i]);
1004 if (flows_emit) pthread_cond_signal(&emit_cond);
1006 while (scan_frag_dreg) {
1007 flow = scan_frag_dreg;
1008 scan_frag_dreg = flow->next;
1009 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1012 put_into(flow, MOVE_INTO
1013 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1017 #if ((DEBUG) & DEBUG_S)
1018 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1026 struct ulog_packet_msg *ulog_msg;
1030 int len, off_frag, psize;
1031 #if ((DEBUG) & DEBUG_C)
1039 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1041 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1044 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1046 #if ((DEBUG) & DEBUG_C)
1047 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1050 nl = (void *) &ulog_msg->payload;
1051 psize = ulog_msg->data_len;
1054 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1055 #if ((DEBUG) & DEBUG_C)
1056 strcat(logbuf, " U");
1057 my_log(LOG_DEBUG, "%s", logbuf);
1059 #if ((DEBUG) & DEBUG_I)
1065 if (pending_head->flags) {
1066 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1068 # if ((DEBUG) & DEBUG_C)
1073 "pending queue full:", "packet lost");
1075 #if ((DEBUG) & DEBUG_I)
1076 pkts_lost_capture++;
1081 #if ((DEBUG) & DEBUG_I)
1085 flow = pending_head;
1087 /* ?FIXME? Add sanity check for ip_len? */
1088 flow->size = ntohs(nl->ip_len);
1089 #if ((DEBUG) & DEBUG_I)
1090 size_total += flow->size;
1093 flow->sip = nl->ip_src;
1094 flow->dip = nl->ip_dst;
1095 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1096 my_log(LOG_INFO, "Received test flow to corewars.org");
1098 flow->iif = snmp_index(ulog_msg->indev_name);
1099 flow->oif = snmp_index(ulog_msg->outdev_name);
1100 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1101 flow->proto = nl->ip_p;
1103 flow->tcp_flags = 0;
1107 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1108 if (ulog_msg->timestamp_sec) {
1109 flow->ctime.sec = ulog_msg->timestamp_sec;
1110 flow->ctime.usec = ulog_msg->timestamp_usec;
1111 } else gettime(&flow->ctime);
1112 flow->mtime = flow->ctime;
1114 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1117 Offset (from network layer) to transport layer header/IP data
1118 IOW IP header size ;-)
1121 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1123 off_tl = nl->ip_hl << 2;
1124 tl = (void *) nl + off_tl;
1126 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1127 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1129 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1130 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1132 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1133 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1134 #if ((DEBUG) & DEBUG_C)
1135 strcat(logbuf, " F");
1137 #if ((DEBUG) & DEBUG_I)
1138 pkts_total_fragmented++;
1140 flow->flags |= FLOW_FRAG;
1141 flow->id = nl->ip_id;
1143 if (!(ntohs(nl->ip_off) & IP_MF)) {
1144 /* Packet whith IP_MF contains information about whole datagram size */
1145 flow->flags |= FLOW_LASTFRAG;
1146 /* size = frag_offset*8 + data_size */
1147 flow->sizeP = off_frag + flow->sizeF;
1151 #if ((DEBUG) & DEBUG_C)
1152 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1153 strcat(logbuf, buf);
1154 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1155 strcat(logbuf, buf);
1159 Fortunately most interesting transport layer information fit
1160 into first 8 bytes of IP data field (minimal nonzero size).
1161 Thus we don't need actual packet reassembling to build whole
1162 transport layer data. We only check the fragment offset for
1163 zero value to find packet with this information.
1165 if (!off_frag && psize >= 8) {
1166 switch (flow->proto) {
1169 flow->sp = ((struct udphdr *)tl)->uh_sport;
1170 flow->dp = ((struct udphdr *)tl)->uh_dport;
1175 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1176 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1179 #ifdef ICMP_TRICK_CISCO
1181 flow->dp = *((int32_t *) tl);
1186 /* Unknown transport layer */
1187 #if ((DEBUG) & DEBUG_C)
1188 strcat(logbuf, " U");
1195 #if ((DEBUG) & DEBUG_C)
1196 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1197 strcat(logbuf, buf);
1199 flow->flags |= FLOW_TL;
1203 /* Check for tcp flags presence (including CWR and ECE). */
1204 if (flow->proto == IPPROTO_TCP
1206 && psize >= 16 - off_frag) {
1207 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1208 #if ((DEBUG) & DEBUG_C)
1209 sprintf(buf, " TCP:%x", flow->tcp_flags);
1210 strcat(logbuf, buf);
1214 #if ((DEBUG) & DEBUG_C)
1215 sprintf(buf, " => %x", (unsigned) flow);
1216 strcat(logbuf, buf);
1217 my_log(LOG_DEBUG, "%s", logbuf);
1220 #if ((DEBUG) & DEBUG_I)
1222 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1223 if (pending_queue_trace < pending_queue_trace_candidate)
1224 pending_queue_trace = pending_queue_trace_candidate;
1227 /* Flow complete - inform unpending_thread() about it */
1228 pending_head->flags |= FLOW_PENDING;
1229 pending_head = pending_head->next;
1231 pthread_cond_signal(&unpending_cond);
1237 int main(int argc, char **argv)
1240 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1241 int c, i, write_fd, memory_limit = 0;
1242 struct addrinfo hints, *res;
1243 struct sockaddr_in saddr;
1244 pthread_attr_t tattr;
1245 struct sigaction sigact;
1246 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1247 struct timeval timeout;
1249 sched_min = sched_get_priority_min(SCHED);
1250 sched_max = sched_get_priority_max(SCHED);
1252 memset(&saddr, 0 , sizeof(saddr));
1253 memset(&hints, 0 , sizeof(hints));
1254 hints.ai_flags = AI_PASSIVE;
1255 hints.ai_family = AF_INET;
1256 hints.ai_socktype = SOCK_DGRAM;
1258 /* Process command line options */
1261 while ((c = my_getopt(argc, argv, parms)) != -1) {
1271 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1272 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1273 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1274 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1275 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1276 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1277 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1278 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1279 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1280 if (parms[nflag].count) {
1281 switch (atoi(parms[nflag].arg)) {
1283 netflow = &NetFlow1;
1290 netflow = &NetFlow7;
1294 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1298 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1299 if (parms[lflag].count) {
1300 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1303 sprintf(errpbuf, "[%s]", log_suffix);
1304 strcat(ident, errpbuf);
1307 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1308 if (log_suffix) *--log_suffix = ':';
1310 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1312 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1315 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1316 if (parms[qflag].count) {
1317 pending_queue_length = atoi(parms[qflag].arg);
1318 if (pending_queue_length < 1) {
1319 fprintf(stderr, "Illegal %s\n", "pending queue length");
1323 if (parms[rflag].count) {
1324 schedp.sched_priority = atoi(parms[rflag].arg);
1325 if (schedp.sched_priority
1326 && (schedp.sched_priority < sched_min
1327 || schedp.sched_priority > sched_max)) {
1328 fprintf(stderr, "Illegal %s\n", "realtime priority");
1332 if (parms[Bflag].count) {
1333 sockbufsize = atoi(parms[Bflag].arg) << 10;
1335 if (parms[bflag].count) {
1336 bulk_quantity = atoi(parms[bflag].arg);
1337 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1338 fprintf(stderr, "Illegal %s\n", "bulk size");
1342 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1343 if (parms[Xflag].count) {
1344 for(i = 0; parms[Xflag].arg[i]; i++)
1345 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1346 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1348 rule = strtok(parms[Xflag].arg, ":");
1349 for (i = 0; rule; i++) {
1350 snmp_rules[i].len = strlen(rule);
1351 if (snmp_rules[i].len > IFNAMSIZ) {
1352 fprintf(stderr, "Illegal %s\n", "interface basename");
1355 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1356 if (!*(rule - 1)) *(rule - 1) = ',';
1357 rule = strtok(NULL, ",");
1359 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1362 snmp_rules[i].base = atoi(rule);
1364 rule = strtok(NULL, ":");
1368 if (parms[tflag].count)
1369 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1370 if (parms[aflag].count) {
1371 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1373 fprintf(stderr, "Illegal %s\n", "source address");
1376 saddr = *((struct sockaddr_in *) res->ai_addr);
1380 if (parms[uflag].count)
1381 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1382 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1387 /* Process collectors parameters. Brrrr... :-[ */
1389 npeers = argc - optind;
1391 /* Send to remote Netflow collector */
1392 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1393 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1395 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1397 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1398 fprintf(stderr, "socket(): %s\n", strerror(errno));
1401 peers[npeers].write_fd = write_fd;
1402 peers[npeers].type = PEER_MIRROR;
1403 peers[npeers].laddr = saddr;
1404 peers[npeers].seq = 0;
1405 if ((lhost = strchr(dport, '/'))) {
1407 if ((type = strchr(lhost, '/'))) {
1415 peers[npeers].type = PEER_ROTATE;
1424 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1425 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1429 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1430 sizeof(struct sockaddr_in))) {
1431 fprintf(stderr, "bind(): %s\n", strerror(errno));
1434 if (getaddrinfo(dhost, dport, &hints, &res)) {
1436 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1439 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1441 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1442 sizeof(struct sockaddr_in))) {
1443 fprintf(stderr, "connect(): %s\n", strerror(errno));
1447 /* Restore command line */
1448 if (type) *--type = '/';
1449 if (lhost) *--lhost = '/';
1453 else if (parms[fflag].count) {
1455 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1456 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1457 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1459 peers[npeers].write_fd = START_VALUE;
1460 peers[npeers].type = PEER_FILE;
1461 peers[npeers].seq = 0;
1470 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1471 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1473 fprintf(stderr, "libipulog initialization error: %s",
1474 ipulog_strerror(ipulog_errno));
1478 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1479 &sockbufsize, sizeof(sockbufsize)) < 0)
1480 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1482 /* Daemonize (if log destination stdout-free) */
1484 my_log_open(ident, verbosity, log_dest);
1485 if (!(log_dest & 2)) {
1486 /* Crash-proofing - Sapan*/
1490 fprintf(stderr, "fork(): %s", strerror(errno));
1495 freopen("/dev/null", "r", stdin);
1496 freopen("/dev/null", "w", stdout);
1497 freopen("/dev/null", "w", stderr);
1501 while (wait3(NULL,0,NULL) < 1);
1505 setvbuf(stdout, (char *)0, _IONBF, 0);
1506 setvbuf(stderr, (char *)0, _IONBF, 0);
1510 sprintf(errpbuf, "[%ld]", (long) pid);
1511 strcat(ident, errpbuf);
1513 /* Initialization */
1515 hash_init(); /* Actually for crc16 only */
1516 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1517 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1520 /* Hope 12 days is enough :-/ */
1521 start_time_offset = 1 << 20;
1523 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1525 gettime(&start_time);
1528 Build static pending queue as circular buffer.
1530 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1531 pending_tail = pending_head;
1532 for (i = pending_queue_length - 1; i--;) {
1533 if (!(pending_tail->next = mem_alloc())) {
1535 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1538 pending_tail = pending_tail->next;
1540 pending_tail->next = pending_head;
1541 pending_tail = pending_head;
1543 sigemptyset(&sig_mask);
1544 sigact.sa_handler = &sighandler;
1545 sigact.sa_mask = sig_mask;
1546 sigact.sa_flags = 0;
1547 sigaddset(&sig_mask, SIGTERM);
1548 sigaction(SIGTERM, &sigact, 0);
1549 #if ((DEBUG) & DEBUG_I)
1550 sigaddset(&sig_mask, SIGUSR1);
1551 sigaction(SIGUSR1, &sigact, 0);
1553 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1554 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1558 my_log(LOG_INFO, "Starting %s...", VERSION);
1560 if (parms[cflag].count) {
1561 if (chdir(parms[cflag].arg) || chroot(".")) {
1562 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1567 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1568 pthread_attr_init(&tattr);
1569 for (i = 0; i < THREADS - 1; i++) {
1570 if (schedp.sched_priority > 0) {
1571 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1572 (pthread_attr_setschedparam(&tattr, &schedp))) {
1573 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1577 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1578 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1581 pthread_detach(thid);
1582 schedp.sched_priority++;
1586 if (setgroups(0, NULL)) {
1587 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1590 if (setregid(pw->pw_gid, pw->pw_gid)) {
1591 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1594 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1595 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1600 if (!(pidfile = fopen(pidfilepath, "w")))
1601 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1603 fprintf(pidfile, "%ld\n", (long) pid);
1607 my_log(LOG_INFO, "pid: %d", pid);
1608 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1609 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1610 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1611 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1612 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1613 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1614 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1615 for (i = 0; i < nsnmp_rules; i++) {
1616 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1617 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1619 for (i = 0; i < npeers; i++) {
1620 switch (peers[i].type) {
1628 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1629 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1630 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1633 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1635 timeout.tv_usec = 0;
1637 || (total_elements - free_elements - pending_queue_length)
1639 || pending_tail->flags) {
1642 timeout.tv_sec = scan_interval;
1643 select(0, 0, 0, 0, &timeout);
1646 if (sigs & SIGTERM_MASK && !killed) {
1647 sigs &= ~SIGTERM_MASK;
1648 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1651 active_lifetime = -1;
1652 inactive_lifetime = -1;
1654 unpending_timeout = 1;
1656 pthread_cond_signal(&scan_cond);
1657 pthread_cond_signal(&unpending_cond);
1660 #if ((DEBUG) & DEBUG_I)
1661 if (sigs & SIGUSR1_MASK) {
1662 sigs &= ~SIGUSR1_MASK;
1667 remove(pidfilepath);
1668 #if ((DEBUG) & DEBUG_I)
1671 my_log(LOG_INFO, "Done.");