2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
43 struct sockaddr_nl local;
44 struct sockaddr_nl peer;
45 struct nlmsghdr* last_nlhdr;
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
60 #include <sys/param.h>
85 #include <sys/select.h>
91 #include <fprobe-ulog.h>
93 #include <my_getopt.h>
98 #define PIDFILE "/var/log/fprobe-ulog.pid"
128 static struct getopt_parms parms[] = {
129 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
147 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
157 extern int optind, opterr, optopt;
160 extern struct NetFlow NetFlow1;
161 extern struct NetFlow NetFlow5;
162 extern struct NetFlow NetFlow7;
164 #define START_VALUE -5
165 #define mark_is_tos parms[Mflag].count
166 static unsigned scan_interval = 5;
167 static int min_free = 0;
168 static int frag_lifetime = 30;
169 static int inactive_lifetime = 60;
170 static int active_lifetime = 300;
171 static int sockbufsize;
172 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
173 #if (MEM_BITS == 0) || (MEM_BITS == 16)
174 #define BULK_QUANTITY 10000
176 #define BULK_QUANTITY 200
179 static unsigned epoch_length=60, log_epochs=1;
180 static unsigned cur_epoch=0,prev_uptime=0;
182 static unsigned bulk_quantity = BULK_QUANTITY;
183 static unsigned pending_queue_length = 100;
184 static struct NetFlow *netflow = &NetFlow5;
185 static unsigned verbosity = 6;
186 static unsigned log_dest = MY_LOG_SYSLOG;
187 static struct Time start_time;
188 static long start_time_offset;
191 extern unsigned total_elements;
192 extern unsigned free_elements;
193 extern unsigned total_memory;
194 #if ((DEBUG) & DEBUG_I)
195 static unsigned emit_pkts, emit_queue;
196 static uint64_t size_total;
197 static unsigned pkts_total, pkts_total_fragmented;
198 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
199 static unsigned pkts_pending, pkts_pending_done;
200 static unsigned pending_queue_trace, pending_queue_trace_candidate;
201 static unsigned flows_total, flows_fragmented;
203 static unsigned emit_count;
204 static uint32_t emit_sequence;
205 static unsigned emit_rate_bytes, emit_rate_delay;
206 static struct Time emit_time;
207 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
208 static pthread_t thid;
209 static sigset_t sig_mask;
210 static struct sched_param schedp;
211 static int sched_min, sched_max;
212 static int npeers, npeers_rot;
213 static struct peer *peers;
216 static struct Flow *flows[1 << HASH_BITS];
217 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
219 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
220 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
222 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
224 static struct Flow *pending_head, *pending_tail;
225 static struct Flow *scan_frag_dreg;
227 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *flows_emit;
231 static char ident[256] = "fprobe-ulog";
232 static FILE *pidfile;
233 static char *pidfilepath;
236 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
237 static struct ipulog_handle *ulog_handle;
238 static uint32_t ulog_gmask = 1;
239 static char *cap_buf;
240 static int nsnmp_rules;
241 static struct snmp_rule *snmp_rules;
242 static struct passwd *pw = 0;
247 "fprobe-ulog: a NetFlow probe. Version %s\n"
248 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
250 "-h\t\tDisplay this help\n"
251 "-U <mask>\tULOG group bitwise mask [1]\n"
252 "-s <seconds>\tHow often scan for expired flows [5]\n"
253 "-g <seconds>\tFragmented flow lifetime [30]\n"
254 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
255 "-f <filename>\tLog flow data in a file\n"
256 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
257 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
258 "-a <address>\tUse <address> as source for NetFlow flow\n"
259 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
260 "-M\t\tUse netfilter mark value as ToS flag\n"
261 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
262 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
263 "-q <flows>\tPending queue length [100]\n"
264 "-B <kilobytes>\tKernel capture buffer size [0]\n"
265 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
266 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
267 "-c <directory>\tDirectory to chroot to\n"
268 "-u <user>\tUser to run as\n"
269 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
270 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
271 "-y <remote:port>\tAddress of the NetFlow collector\n"
272 "-f <writable file>\tFile to write data into\n"
273 "-T <n>\tRotate log file every n epochs\n"
274 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
275 "-E <[1..60]>\tSize of an epoch in minutes\n"
276 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
278 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
282 #if ((DEBUG) & DEBUG_I)
285 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
286 pkts_total, pkts_total_fragmented, size_total,
287 pkts_pending - pkts_pending_done, pending_queue_trace);
288 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
289 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
290 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
291 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
292 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
293 total_elements, free_elements, total_memory);
297 void sighandler(int sig)
301 sigs |= SIGTERM_MASK;
303 #if ((DEBUG) & DEBUG_I)
305 sigs |= SIGUSR1_MASK;
311 void gettime(struct Time *now)
317 now->usec = t.tv_usec;
321 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
323 return (t1->sec - t2->sec)/60;
326 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
328 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
331 /* Uptime in miliseconds */
332 uint32_t getuptime(struct Time *t)
334 /* Maximum uptime is about 49/2 days */
335 return cmpmtime(t, &start_time);
338 /* Uptime in minutes */
339 uint32_t getuptime_minutes(struct Time *t)
341 /* Maximum uptime is about 49/2 days */
342 return cmpMtime(t, &start_time);
345 hash_t hash_flow(struct Flow *flow)
347 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
348 else return hash(flow, sizeof(struct Flow_TL));
351 uint16_t snmp_index(char *name) {
354 if (!*name) return 0;
356 for (i = 0; (int) i < nsnmp_rules; i++) {
357 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
358 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
361 if ((i = if_nametoindex(name))) return i;
366 inline void copy_flow(struct Flow *src, struct Flow *dst)
373 dst->proto = src->proto;
374 dst->tcp_flags = src->tcp_flags;
378 dst->pkts = src->pkts;
379 dst->size = src->size;
380 dst->sizeF = src->sizeF;
381 dst->sizeP = src->sizeP;
382 dst->ctime = src->ctime;
383 dst->mtime = src->mtime;
384 dst->flags = src->flags;
387 void get_cur_epoch() {
389 fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
393 len = read(fd, snum, sizeof(snum)-1);
396 sscanf(snum,"%d",&cur_epoch);
397 cur_epoch++; /* Let's not stone the last epoch */
405 void update_cur_epoch_file(int n) {
408 len=snprintf(snum,6,"%d",n);
409 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
411 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
414 write(fd, snum, len);
418 unsigned get_log_fd(char *fname, int cur_fd) {
421 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
422 * doesn't solve the problem */
424 struct statfs statfs;
427 cur_uptime = getuptime_minutes(&now);
430 if (cur_fd!=START_VALUE) {
431 if (fstatfs(cur_fd, &statfs) == -1) {
432 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
435 if (min_free && (statfs.f_bavail < min_free))
437 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
438 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
441 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
447 /* Epoch length in minutes */
448 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
449 char nextname[MAX_PATH_LEN];
451 prev_uptime = cur_uptime;
452 cur_epoch = (cur_epoch + 1) % log_epochs;
455 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
456 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
457 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
460 update_cur_epoch_file(cur_epoch);
469 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
471 struct Flow **flowpp;
477 if (prev) flowpp = *prev;
480 if (where->sip.s_addr == what->sip.s_addr
481 && where->dip.s_addr == what->dip.s_addr
482 && where->proto == what->proto) {
483 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
485 /* Both unfragmented */
486 if ((what->sp == where->sp)
487 && (what->dp == where->dp)) goto done;
490 /* Both fragmented */
491 if (where->id == what->id) goto done;
495 flowpp = &where->next;
499 if (prev) *prev = flowpp;
503 int put_into(struct Flow *flow, int flag
504 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
511 struct Flow *flown, **flowpp;
512 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
517 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
518 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
521 pthread_mutex_lock(&flows_mutex[h]);
523 if (!(flown = find(flows[h], flow, &flowpp))) {
524 /* No suitable flow found - add */
525 if (flag == COPY_INTO) {
526 if ((flown = mem_alloc())) {
527 copy_flow(flow, flown);
530 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
531 my_log(LOG_ERR, "%s %s. %s",
532 "mem_alloc():", strerror(errno), "packet lost");
537 flow->next = flows[h];
539 #if ((DEBUG) & DEBUG_I)
541 if (flow->flags & FLOW_FRAG) flows_fragmented++;
543 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
545 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
550 /* Found suitable flow - update */
551 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
552 sprintf(buf, " +> %x", (unsigned) flown);
555 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
556 flown->mtime = flow->mtime;
557 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
558 flown->ctime = flow->ctime;
559 flown->tcp_flags |= flow->tcp_flags;
560 flown->size += flow->size;
561 flown->pkts += flow->pkts;
562 if (flow->flags & FLOW_FRAG) {
563 /* Fragmented flow require some additional work */
564 if (flow->flags & FLOW_TL) {
567 Several packets with FLOW_TL (attack)
569 flown->sp = flow->sp;
570 flown->dp = flow->dp;
572 if (flow->flags & FLOW_LASTFRAG) {
575 Several packets with FLOW_LASTFRAG (attack)
577 flown->sizeP = flow->sizeP;
579 flown->flags |= flow->flags;
580 flown->sizeF += flow->sizeF;
581 if ((flown->flags & FLOW_LASTFRAG)
582 && (flown->sizeF >= flown->sizeP)) {
583 /* All fragments received - flow reassembled */
584 *flowpp = flown->next;
585 pthread_mutex_unlock(&flows_mutex[h]);
586 #if ((DEBUG) & DEBUG_I)
591 flown->flags &= ~FLOW_FRAG;
592 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
595 ret = put_into(flown, MOVE_INTO
596 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
602 if (flag == MOVE_INTO) mem_free(flow);
604 pthread_mutex_unlock(&flows_mutex[h]);
608 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
612 for (i = 0; i < fields; i++) {
613 #if ((DEBUG) & DEBUG_F)
614 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
617 case NETFLOW_IPV4_SRC_ADDR:
618 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
619 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
622 case NETFLOW_IPV4_DST_ADDR:
623 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
624 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
625 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
627 p += NETFLOW_IPV4_DST_ADDR_SIZE;
630 case NETFLOW_INPUT_SNMP:
631 *((uint16_t *) p) = htons(flow->iif);
632 p += NETFLOW_INPUT_SNMP_SIZE;
635 case NETFLOW_OUTPUT_SNMP:
636 *((uint16_t *) p) = htons(flow->oif);
637 p += NETFLOW_OUTPUT_SNMP_SIZE;
640 case NETFLOW_PKTS_32:
641 *((uint32_t *) p) = htonl(flow->pkts);
642 p += NETFLOW_PKTS_32_SIZE;
645 case NETFLOW_BYTES_32:
646 *((uint32_t *) p) = htonl(flow->size);
647 p += NETFLOW_BYTES_32_SIZE;
650 case NETFLOW_FIRST_SWITCHED:
651 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
652 p += NETFLOW_FIRST_SWITCHED_SIZE;
655 case NETFLOW_LAST_SWITCHED:
656 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
657 p += NETFLOW_LAST_SWITCHED_SIZE;
660 case NETFLOW_L4_SRC_PORT:
661 *((uint16_t *) p) = flow->sp;
662 p += NETFLOW_L4_SRC_PORT_SIZE;
665 case NETFLOW_L4_DST_PORT:
666 *((uint16_t *) p) = flow->dp;
667 p += NETFLOW_L4_DST_PORT_SIZE;
671 *((uint8_t *) p) = flow->proto;
672 p += NETFLOW_PROT_SIZE;
675 case NETFLOW_SRC_TOS:
676 *((uint8_t *) p) = flow->tos;
677 p += NETFLOW_SRC_TOS_SIZE;
680 case NETFLOW_TCP_FLAGS:
681 *((uint8_t *) p) = flow->tcp_flags;
682 p += NETFLOW_TCP_FLAGS_SIZE;
685 case NETFLOW_VERSION:
686 *((uint16_t *) p) = htons(netflow->Version);
687 p += NETFLOW_VERSION_SIZE;
691 *((uint16_t *) p) = htons(emit_count);
692 p += NETFLOW_COUNT_SIZE;
696 *((uint32_t *) p) = htonl(getuptime(&emit_time));
697 p += NETFLOW_UPTIME_SIZE;
700 case NETFLOW_UNIX_SECS:
701 *((uint32_t *) p) = htonl(emit_time.sec);
702 p += NETFLOW_UNIX_SECS_SIZE;
705 case NETFLOW_UNIX_NSECS:
706 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
707 p += NETFLOW_UNIX_NSECS_SIZE;
710 case NETFLOW_FLOW_SEQUENCE:
711 //*((uint32_t *) p) = htonl(emit_sequence);
712 *((uint32_t *) p) = 0;
713 p += NETFLOW_FLOW_SEQUENCE_SIZE;
717 /* Unsupported (uint8_t) */
718 case NETFLOW_ENGINE_TYPE:
719 case NETFLOW_ENGINE_ID:
720 case NETFLOW_FLAGS7_1:
721 case NETFLOW_SRC_MASK:
722 case NETFLOW_DST_MASK:
723 *((uint8_t *) p) = 0;
724 p += NETFLOW_PAD8_SIZE;
727 *((uint16_t *) p) = flow->tos;
728 p += NETFLOW_XID_SIZE;
731 /* Unsupported (uint16_t) */
734 case NETFLOW_FLAGS7_2:
735 *((uint16_t *) p) = 0;
736 p += NETFLOW_PAD16_SIZE;
740 /* Unsupported (uint32_t) */
741 case NETFLOW_IPV4_NEXT_HOP:
742 case NETFLOW_ROUTER_SC:
743 *((uint32_t *) p) = 0;
744 p += NETFLOW_PAD32_SIZE;
748 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
749 format, i, format[i]);
753 #if ((DEBUG) & DEBUG_F)
754 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
761 Workaround for clone()-based threads
762 Try to change EUID independently of main thread
766 setregid(pw->pw_gid, pw->pw_gid);
767 setreuid(pw->pw_uid, pw->pw_uid);
776 struct timespec timeout;
777 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
779 p = (void *) &emit_packet + netflow->HeaderSize;
785 pthread_mutex_lock(&emit_mutex);
786 while (!flows_emit) {
787 gettimeofday(&now, 0);
788 timeout.tv_sec = now.tv_sec + emit_timeout;
789 /* Do not wait until emit_packet will filled - it may be too long */
790 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
791 pthread_mutex_unlock(&emit_mutex);
796 flows_emit = flows_emit->next;
797 #if ((DEBUG) & DEBUG_I)
800 pthread_mutex_unlock(&emit_mutex);
804 gettime(&start_time);
805 start_time.sec -= start_time_offset;
808 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
812 printf("Emit count = %d\n", emit_count);
815 if (emit_count == netflow->MaxFlows) {
818 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
819 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
820 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
821 #ifdef STD_NETFLOW_PDU
822 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
825 for (i = 0; i < npeers; i++) {
826 if (peers[i].type == PEER_FILE) {
827 if (netflow->SeqOffset)
828 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
829 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
830 ret = write(peers[i].write_fd, emit_packet, size);
833 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
834 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
835 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
839 #if ((DEBUG) & DEBUG_E)
841 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
842 emit_count, i + 1, peers[i].seq);
845 peers[i].seq += emit_count;
848 if (emit_rate_bytes) {
850 delay = sent / emit_rate_bytes;
852 sent %= emit_rate_bytes;
854 timeout.tv_nsec = emit_rate_delay * delay;
855 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
860 if (peers[i].type == PEER_MIRROR) goto sendreal;
862 if (peers[i].type == PEER_ROTATE)
863 if (peer_rot_cur++ == peer_rot_work) {
865 if (netflow->SeqOffset)
866 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
867 ret = send(peers[i].write_fd, emit_packet, size, 0);
869 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
870 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
871 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
874 #if ((DEBUG) & DEBUG_E)
876 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
877 emit_count, i + 1, peers[i].seq);
880 peers[i].seq += emit_count;
883 if (emit_rate_bytes) {
885 delay = sent / emit_rate_bytes;
887 sent %= emit_rate_bytes;
889 timeout.tv_nsec = emit_rate_delay * delay;
890 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
895 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
896 emit_sequence += emit_count;
898 #if ((DEBUG) & DEBUG_I)
905 void *unpending_thread()
908 struct timespec timeout;
909 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
916 pthread_mutex_lock(&unpending_mutex);
919 while (!(pending_tail->flags & FLOW_PENDING)) {
920 gettimeofday(&now, 0);
921 timeout.tv_sec = now.tv_sec + unpending_timeout;
922 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
925 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
928 if (put_into(pending_tail, COPY_INTO
929 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
933 #if ((DEBUG) & DEBUG_I)
934 pkts_lost_unpending++;
938 #if ((DEBUG) & DEBUG_U)
939 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
942 pending_tail->flags = 0;
943 pending_tail = pending_tail->next;
944 #if ((DEBUG) & DEBUG_I)
952 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
956 struct Flow *flow, **flowpp;
958 struct timespec timeout;
963 pthread_mutex_lock(&scan_mutex);
967 timeout.tv_sec = now.sec + scan_interval;
968 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
971 #if ((DEBUG) & DEBUG_S)
972 my_log(LOG_DEBUG, "S: %d", now.sec);
974 for (i = 0; i < 1 << HASH_BITS ; i++) {
975 pthread_mutex_lock(&flows_mutex[i]);
979 if (flow->flags & FLOW_FRAG) {
980 /* Process fragmented flow */
981 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
982 /* Fragmented flow expired - put it into special chain */
983 #if ((DEBUG) & DEBUG_I)
987 *flowpp = flow->next;
989 flow->flags &= ~FLOW_FRAG;
990 flow->next = scan_frag_dreg;
991 scan_frag_dreg = flow;
996 /* Flow is not frgamented */
997 if ((now.sec - flow->mtime.sec) > inactive_lifetime
998 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1000 #if ((DEBUG) & DEBUG_S)
1001 my_log(LOG_DEBUG, "S: E %x", flow);
1003 #if ((DEBUG) & DEBUG_I)
1006 *flowpp = flow->next;
1007 pthread_mutex_lock(&emit_mutex);
1008 flow->next = flows_emit;
1010 #if ((DEBUG) & DEBUG_I)
1013 pthread_mutex_unlock(&emit_mutex);
1018 flowpp = &flow->next;
1021 pthread_mutex_unlock(&flows_mutex[i]);
1023 if (flows_emit) pthread_cond_signal(&emit_cond);
1025 while (scan_frag_dreg) {
1026 flow = scan_frag_dreg;
1027 scan_frag_dreg = flow->next;
1028 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1031 put_into(flow, MOVE_INTO
1032 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1036 #if ((DEBUG) & DEBUG_S)
1037 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1045 struct ulog_packet_msg *ulog_msg;
1049 int len, off_frag, psize;
1050 #if ((DEBUG) & DEBUG_C)
1058 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1060 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1063 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1065 #if ((DEBUG) & DEBUG_C)
1066 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1069 nl = (void *) &ulog_msg->payload;
1070 psize = ulog_msg->data_len;
1073 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1074 #if ((DEBUG) & DEBUG_C)
1075 strcat(logbuf, " U");
1076 my_log(LOG_DEBUG, "%s", logbuf);
1078 #if ((DEBUG) & DEBUG_I)
1084 if (pending_head->flags) {
1085 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1087 # if ((DEBUG) & DEBUG_C)
1092 "pending queue full:", "packet lost");
1094 #if ((DEBUG) & DEBUG_I)
1095 pkts_lost_capture++;
1100 #if ((DEBUG) & DEBUG_I)
1104 flow = pending_head;
1106 /* ?FIXME? Add sanity check for ip_len? */
1107 flow->size = ntohs(nl->ip_len);
1108 #if ((DEBUG) & DEBUG_I)
1109 size_total += flow->size;
1112 flow->sip = nl->ip_src;
1113 flow->dip = nl->ip_dst;
1114 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1115 my_log(LOG_INFO, "Received test flow to corewars.org");
1117 flow->iif = snmp_index(ulog_msg->indev_name);
1118 flow->oif = snmp_index(ulog_msg->outdev_name);
1119 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1120 flow->proto = nl->ip_p;
1122 flow->tcp_flags = 0;
1126 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1127 if (ulog_msg->timestamp_sec) {
1128 flow->ctime.sec = ulog_msg->timestamp_sec;
1129 flow->ctime.usec = ulog_msg->timestamp_usec;
1130 } else gettime(&flow->ctime);
1131 flow->mtime = flow->ctime;
1133 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1136 Offset (from network layer) to transport layer header/IP data
1137 IOW IP header size ;-)
1140 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1142 off_tl = nl->ip_hl << 2;
1143 tl = (void *) nl + off_tl;
1145 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1146 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1148 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1149 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1151 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1152 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1153 #if ((DEBUG) & DEBUG_C)
1154 strcat(logbuf, " F");
1156 #if ((DEBUG) & DEBUG_I)
1157 pkts_total_fragmented++;
1159 flow->flags |= FLOW_FRAG;
1160 flow->id = nl->ip_id;
1162 if (!(ntohs(nl->ip_off) & IP_MF)) {
1163 /* Packet whith IP_MF contains information about whole datagram size */
1164 flow->flags |= FLOW_LASTFRAG;
1165 /* size = frag_offset*8 + data_size */
1166 flow->sizeP = off_frag + flow->sizeF;
1170 #if ((DEBUG) & DEBUG_C)
1171 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1172 strcat(logbuf, buf);
1173 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1174 strcat(logbuf, buf);
1178 Fortunately most interesting transport layer information fit
1179 into first 8 bytes of IP data field (minimal nonzero size).
1180 Thus we don't need actual packet reassembling to build whole
1181 transport layer data. We only check the fragment offset for
1182 zero value to find packet with this information.
1184 if (!off_frag && psize >= 8) {
1185 switch (flow->proto) {
1188 flow->sp = ((struct udphdr *)tl)->uh_sport;
1189 flow->dp = ((struct udphdr *)tl)->uh_dport;
1194 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1195 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1198 #ifdef ICMP_TRICK_CISCO
1200 flow->dp = *((int32_t *) tl);
1205 /* Unknown transport layer */
1206 #if ((DEBUG) & DEBUG_C)
1207 strcat(logbuf, " U");
1214 #if ((DEBUG) & DEBUG_C)
1215 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1216 strcat(logbuf, buf);
1218 flow->flags |= FLOW_TL;
1222 /* Check for tcp flags presence (including CWR and ECE). */
1223 if (flow->proto == IPPROTO_TCP
1225 && psize >= 16 - off_frag) {
1226 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1227 #if ((DEBUG) & DEBUG_C)
1228 sprintf(buf, " TCP:%x", flow->tcp_flags);
1229 strcat(logbuf, buf);
1233 #if ((DEBUG) & DEBUG_C)
1234 sprintf(buf, " => %x", (unsigned) flow);
1235 strcat(logbuf, buf);
1236 my_log(LOG_DEBUG, "%s", logbuf);
1239 #if ((DEBUG) & DEBUG_I)
1241 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1242 if (pending_queue_trace < pending_queue_trace_candidate)
1243 pending_queue_trace = pending_queue_trace_candidate;
1246 /* Flow complete - inform unpending_thread() about it */
1247 pending_head->flags |= FLOW_PENDING;
1248 pending_head = pending_head->next;
1250 pthread_cond_signal(&unpending_cond);
1256 /* Copied out of CoDemux */
1258 static int init_daemon() {
1262 pidfile = fopen(PIDFILE, "w");
1263 if (pidfile == NULL) {
1264 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1267 if ((pid = fork()) < 0) {
1269 my_log(LOG_ERR, "Could not fork!\n");
1272 else if (pid != 0) {
1273 /* i'm the parent, writing down the child pid */
1274 fprintf(pidfile, "%u\n", pid);
1279 /* close the pid file */
1282 /* routines for any daemon process
1283 1. create a new session
1284 2. change directory to the root
1285 3. change the file creation permission
1288 chdir("/usr/local/fprobe");
1294 int main(int argc, char **argv)
1297 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1298 int c, i, write_fd, memory_limit = 0;
1299 struct addrinfo hints, *res;
1300 struct sockaddr_in saddr;
1301 pthread_attr_t tattr;
1302 struct sigaction sigact;
1303 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1304 struct timeval timeout;
1306 sched_min = sched_get_priority_min(SCHED);
1307 sched_max = sched_get_priority_max(SCHED);
1309 memset(&saddr, 0 , sizeof(saddr));
1310 memset(&hints, 0 , sizeof(hints));
1311 hints.ai_flags = AI_PASSIVE;
1312 hints.ai_family = AF_INET;
1313 hints.ai_socktype = SOCK_DGRAM;
1315 /* Process command line options */
1318 while ((c = my_getopt(argc, argv, parms)) != -1) {
1328 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1329 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1330 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1331 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1332 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1333 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1334 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1335 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1336 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1337 if (parms[nflag].count) {
1338 switch (atoi(parms[nflag].arg)) {
1340 netflow = &NetFlow1;
1347 netflow = &NetFlow7;
1351 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1355 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1356 if (parms[lflag].count) {
1357 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1360 sprintf(errpbuf, "[%s]", log_suffix);
1361 strcat(ident, errpbuf);
1364 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1365 if (log_suffix) *--log_suffix = ':';
1367 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1369 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1372 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1373 if (parms[qflag].count) {
1374 pending_queue_length = atoi(parms[qflag].arg);
1375 if (pending_queue_length < 1) {
1376 fprintf(stderr, "Illegal %s\n", "pending queue length");
1380 if (parms[rflag].count) {
1381 schedp.sched_priority = atoi(parms[rflag].arg);
1382 if (schedp.sched_priority
1383 && (schedp.sched_priority < sched_min
1384 || schedp.sched_priority > sched_max)) {
1385 fprintf(stderr, "Illegal %s\n", "realtime priority");
1389 if (parms[Bflag].count) {
1390 sockbufsize = atoi(parms[Bflag].arg) << 10;
1392 if (parms[bflag].count) {
1393 bulk_quantity = atoi(parms[bflag].arg);
1394 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1395 fprintf(stderr, "Illegal %s\n", "bulk size");
1399 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1400 if (parms[Xflag].count) {
1401 for(i = 0; parms[Xflag].arg[i]; i++)
1402 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1403 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1405 rule = strtok(parms[Xflag].arg, ":");
1406 for (i = 0; rule; i++) {
1407 snmp_rules[i].len = strlen(rule);
1408 if (snmp_rules[i].len > IFNAMSIZ) {
1409 fprintf(stderr, "Illegal %s\n", "interface basename");
1412 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1413 if (!*(rule - 1)) *(rule - 1) = ',';
1414 rule = strtok(NULL, ",");
1416 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1419 snmp_rules[i].base = atoi(rule);
1421 rule = strtok(NULL, ":");
1425 if (parms[tflag].count)
1426 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1427 if (parms[aflag].count) {
1428 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1430 fprintf(stderr, "Illegal %s\n", "source address");
1433 saddr = *((struct sockaddr_in *) res->ai_addr);
1437 if (parms[uflag].count)
1438 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1439 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1444 /* Process collectors parameters. Brrrr... :-[ */
1446 npeers = argc - optind;
1448 /* Send to remote Netflow collector */
1449 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1450 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1452 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1454 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1455 fprintf(stderr, "socket(): %s\n", strerror(errno));
1458 peers[npeers].write_fd = write_fd;
1459 peers[npeers].type = PEER_MIRROR;
1460 peers[npeers].laddr = saddr;
1461 peers[npeers].seq = 0;
1462 if ((lhost = strchr(dport, '/'))) {
1464 if ((type = strchr(lhost, '/'))) {
1472 peers[npeers].type = PEER_ROTATE;
1481 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1482 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1486 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1487 sizeof(struct sockaddr_in))) {
1488 fprintf(stderr, "bind(): %s\n", strerror(errno));
1491 if (getaddrinfo(dhost, dport, &hints, &res)) {
1493 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1496 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1498 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1499 sizeof(struct sockaddr_in))) {
1500 fprintf(stderr, "connect(): %s\n", strerror(errno));
1504 /* Restore command line */
1505 if (type) *--type = '/';
1506 if (lhost) *--lhost = '/';
1510 else if (parms[fflag].count) {
1512 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1513 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1514 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1516 peers[npeers].write_fd = START_VALUE;
1517 peers[npeers].type = PEER_FILE;
1518 peers[npeers].seq = 0;
1527 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1528 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1530 fprintf(stderr, "libipulog initialization error: %s",
1531 ipulog_strerror(ipulog_errno));
1535 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1536 &sockbufsize, sizeof(sockbufsize)) < 0)
1537 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1539 /* Daemonize (if log destination stdout-free) */
1541 my_log_open(ident, verbosity, log_dest);
1545 if (!(log_dest & 2)) {
1546 /* Crash-proofing - Sapan*/
1550 fprintf(stderr, "fork(): %s", strerror(errno));
1555 freopen("/dev/null", "r", stdin);
1556 freopen("/dev/null", "w", stdout);
1557 freopen("/dev/null", "w", stderr);
1561 while (wait3(NULL,0,NULL) < 1);
1565 setvbuf(stdout, (char *)0, _IONBF, 0);
1566 setvbuf(stderr, (char *)0, _IONBF, 0);
1570 sprintf(errpbuf, "[%ld]", (long) pid);
1571 strcat(ident, errpbuf);
1573 /* Initialization */
1575 hash_init(); /* Actually for crc16 only */
1576 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1577 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1580 /* Hope 12 days is enough :-/ */
1581 start_time_offset = 1 << 20;
1583 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1585 gettime(&start_time);
1588 Build static pending queue as circular buffer.
1590 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1591 pending_tail = pending_head;
1592 for (i = pending_queue_length - 1; i--;) {
1593 if (!(pending_tail->next = mem_alloc())) {
1595 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1598 pending_tail = pending_tail->next;
1600 pending_tail->next = pending_head;
1601 pending_tail = pending_head;
1603 sigemptyset(&sig_mask);
1604 sigact.sa_handler = &sighandler;
1605 sigact.sa_mask = sig_mask;
1606 sigact.sa_flags = 0;
1607 sigaddset(&sig_mask, SIGTERM);
1608 sigaction(SIGTERM, &sigact, 0);
1609 #if ((DEBUG) & DEBUG_I)
1610 sigaddset(&sig_mask, SIGUSR1);
1611 sigaction(SIGUSR1, &sigact, 0);
1613 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1614 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1618 my_log(LOG_INFO, "Starting %s...", VERSION);
1620 if (parms[cflag].count) {
1621 if (chdir(parms[cflag].arg) || chroot(".")) {
1622 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1627 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1628 pthread_attr_init(&tattr);
1629 for (i = 0; i < THREADS - 1; i++) {
1630 if (schedp.sched_priority > 0) {
1631 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1632 (pthread_attr_setschedparam(&tattr, &schedp))) {
1633 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1637 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1638 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1641 pthread_detach(thid);
1642 schedp.sched_priority++;
1646 if (setgroups(0, NULL)) {
1647 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1650 if (setregid(pw->pw_gid, pw->pw_gid)) {
1651 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1654 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1655 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1660 if (!(pidfile = fopen(pidfilepath, "w")))
1661 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1663 fprintf(pidfile, "%ld\n", (long) pid);
1667 my_log(LOG_INFO, "pid: %d", pid);
1668 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1669 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1670 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1671 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1672 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1673 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1674 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1675 for (i = 0; i < nsnmp_rules; i++) {
1676 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1677 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1679 for (i = 0; i < npeers; i++) {
1680 switch (peers[i].type) {
1688 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1689 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1690 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1693 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1695 timeout.tv_usec = 0;
1697 || (total_elements - free_elements - pending_queue_length)
1699 || pending_tail->flags) {
1702 timeout.tv_sec = scan_interval;
1703 select(0, 0, 0, 0, &timeout);
1706 if (sigs & SIGTERM_MASK && !killed) {
1707 sigs &= ~SIGTERM_MASK;
1708 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1711 active_lifetime = -1;
1712 inactive_lifetime = -1;
1714 unpending_timeout = 1;
1716 pthread_cond_signal(&scan_cond);
1717 pthread_cond_signal(&unpending_cond);
1720 #if ((DEBUG) & DEBUG_I)
1721 if (sigs & SIGUSR1_MASK) {
1722 sigs &= ~SIGUSR1_MASK;
1727 remove(pidfilepath);
1728 #if ((DEBUG) & DEBUG_I)
1731 my_log(LOG_INFO, "Done.");