2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
43 struct sockaddr_nl local;
44 struct sockaddr_nl peer;
45 struct nlmsghdr* last_nlhdr;
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
60 #include <sys/param.h>
85 #include <sys/select.h>
91 #include <fprobe-ulog.h>
93 #include <my_getopt.h>
98 #define PIDFILE "/var/log/fprobe-ulog.pid"
99 #define STD_NETFLOW_PDU
129 static struct getopt_parms parms[] = {
130 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
147 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158 extern int optind, opterr, optopt;
161 extern struct NetFlow NetFlow1;
162 extern struct NetFlow NetFlow5;
163 extern struct NetFlow NetFlow7;
165 #define START_VALUE -5
166 #define mark_is_tos parms[Mflag].count
167 static unsigned scan_interval = 5;
168 static int min_free = 0;
169 static int frag_lifetime = 30;
170 static int inactive_lifetime = 60;
171 static int active_lifetime = 300;
172 static int sockbufsize;
173 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
174 #if (MEM_BITS == 0) || (MEM_BITS == 16)
175 #define BULK_QUANTITY 10000
177 #define BULK_QUANTITY 200
180 static unsigned epoch_length=60, log_epochs=1;
181 static unsigned cur_epoch=0,prev_uptime=0;
183 static unsigned bulk_quantity = BULK_QUANTITY;
184 static unsigned pending_queue_length = 100;
185 static struct NetFlow *netflow = &NetFlow5;
186 static unsigned verbosity = 6;
187 static unsigned log_dest = MY_LOG_SYSLOG;
188 static struct Time start_time;
189 static long start_time_offset;
192 extern unsigned total_elements;
193 extern unsigned free_elements;
194 extern unsigned total_memory;
195 #if ((DEBUG) & DEBUG_I)
196 static unsigned emit_pkts, emit_queue;
197 static uint64_t size_total;
198 static unsigned pkts_total, pkts_total_fragmented;
199 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
200 static unsigned pkts_pending, pkts_pending_done;
201 static unsigned pending_queue_trace, pending_queue_trace_candidate;
202 static unsigned flows_total, flows_fragmented;
204 static unsigned emit_count;
205 static uint32_t emit_sequence;
206 static unsigned emit_rate_bytes, emit_rate_delay;
207 static struct Time emit_time;
208 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
209 static pthread_t thid;
210 static sigset_t sig_mask;
211 static struct sched_param schedp;
212 static int sched_min, sched_max;
213 static int npeers, npeers_rot;
214 static struct peer *peers;
217 static struct Flow *flows[1 << HASH_BITS];
218 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
220 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
221 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
223 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
224 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
225 static struct Flow *pending_head, *pending_tail;
226 static struct Flow *scan_frag_dreg;
228 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *flows_emit;
232 static char ident[256] = "fprobe-ulog";
233 static FILE *pidfile;
234 static char *pidfilepath;
237 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
238 static struct ipulog_handle *ulog_handle;
239 static uint32_t ulog_gmask = 1;
240 static char *cap_buf;
241 static int nsnmp_rules;
242 static struct snmp_rule *snmp_rules;
243 static struct passwd *pw = 0;
248 "fprobe-ulog: a NetFlow probe. Version %s\n"
249 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
251 "-h\t\tDisplay this help\n"
252 "-U <mask>\tULOG group bitwise mask [1]\n"
253 "-s <seconds>\tHow often scan for expired flows [5]\n"
254 "-g <seconds>\tFragmented flow lifetime [30]\n"
255 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
256 "-f <filename>\tLog flow data in a file\n"
257 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
258 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
259 "-a <address>\tUse <address> as source for NetFlow flow\n"
260 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
261 "-M\t\tUse netfilter mark value as ToS flag\n"
262 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
263 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
264 "-q <flows>\tPending queue length [100]\n"
265 "-B <kilobytes>\tKernel capture buffer size [0]\n"
266 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
267 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
268 "-c <directory>\tDirectory to chroot to\n"
269 "-u <user>\tUser to run as\n"
270 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
271 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
272 "-y <remote:port>\tAddress of the NetFlow collector\n"
273 "-f <writable file>\tFile to write data into\n"
274 "-T <n>\tRotate log file every n epochs\n"
275 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
276 "-E <[1..60]>\tSize of an epoch in minutes\n"
277 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
279 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
283 #if ((DEBUG) & DEBUG_I)
286 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
287 pkts_total, pkts_total_fragmented, size_total,
288 pkts_pending - pkts_pending_done, pending_queue_trace);
289 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
290 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
291 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
292 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
293 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
294 total_elements, free_elements, total_memory);
298 void sighandler(int sig)
302 sigs |= SIGTERM_MASK;
304 #if ((DEBUG) & DEBUG_I)
306 sigs |= SIGUSR1_MASK;
312 void gettime(struct Time *now)
318 now->usec = t.tv_usec;
322 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
324 return (t1->sec - t2->sec)/60;
327 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
329 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
332 /* Uptime in miliseconds */
333 uint32_t getuptime(struct Time *t)
335 /* Maximum uptime is about 49/2 days */
336 return cmpmtime(t, &start_time);
339 /* Uptime in minutes */
340 uint32_t getuptime_minutes(struct Time *t)
342 /* Maximum uptime is about 49/2 days */
343 return cmpMtime(t, &start_time);
346 hash_t hash_flow(struct Flow *flow)
348 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
349 else return hash(flow, sizeof(struct Flow_TL));
352 uint16_t snmp_index(char *name) {
355 if (!*name) return 0;
357 for (i = 0; (int) i < nsnmp_rules; i++) {
358 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
359 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
362 if ((i = if_nametoindex(name))) return i;
367 inline void copy_flow(struct Flow *src, struct Flow *dst)
374 dst->proto = src->proto;
375 dst->tcp_flags = src->tcp_flags;
379 dst->pkts = src->pkts;
380 dst->size = src->size;
381 dst->sizeF = src->sizeF;
382 dst->sizeP = src->sizeP;
383 dst->ctime = src->ctime;
384 dst->mtime = src->mtime;
385 dst->flags = src->flags;
388 void get_cur_epoch() {
390 fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
394 len = read(fd, snum, sizeof(snum)-1);
397 sscanf(snum,"%d",&cur_epoch);
398 cur_epoch++; /* Let's not stone the last epoch */
406 void update_cur_epoch_file(int n) {
409 len=snprintf(snum,6,"%d",n);
410 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
412 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
415 write(fd, snum, len);
419 unsigned get_log_fd(char *fname, int cur_fd) {
422 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
423 * doesn't solve the problem */
425 struct statfs statfs;
428 cur_uptime = getuptime_minutes(&now);
431 if (cur_fd!=START_VALUE) {
432 if (fstatfs(cur_fd, &statfs) == -1) {
433 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
436 if (min_free && (statfs.f_bavail < min_free))
438 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
439 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
442 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
448 /* Epoch length in minutes */
449 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
450 char nextname[MAX_PATH_LEN];
452 prev_uptime = cur_uptime;
453 cur_epoch = (cur_epoch + 1) % log_epochs;
456 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
457 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
458 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
461 update_cur_epoch_file(cur_epoch);
470 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
472 struct Flow **flowpp;
478 if (prev) flowpp = *prev;
481 if (where->sip.s_addr == what->sip.s_addr
482 && where->dip.s_addr == what->dip.s_addr
483 && where->proto == what->proto) {
484 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
486 /* Both unfragmented */
487 if ((what->sp == where->sp)
488 && (what->dp == where->dp)) goto done;
491 /* Both fragmented */
492 if (where->id == what->id) goto done;
496 flowpp = &where->next;
500 if (prev) *prev = flowpp;
504 int put_into(struct Flow *flow, int flag
505 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
512 struct Flow *flown, **flowpp;
513 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
518 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
519 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
522 pthread_mutex_lock(&flows_mutex[h]);
524 if (!(flown = find(flows[h], flow, &flowpp))) {
525 /* No suitable flow found - add */
526 if (flag == COPY_INTO) {
527 if ((flown = mem_alloc())) {
528 copy_flow(flow, flown);
531 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
532 my_log(LOG_ERR, "%s %s. %s",
533 "mem_alloc():", strerror(errno), "packet lost");
538 flow->next = flows[h];
540 #if ((DEBUG) & DEBUG_I)
542 if (flow->flags & FLOW_FRAG) flows_fragmented++;
544 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
546 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
551 /* Found suitable flow - update */
552 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
553 sprintf(buf, " +> %x", (unsigned) flown);
556 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
557 flown->mtime = flow->mtime;
558 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
559 flown->ctime = flow->ctime;
560 flown->tcp_flags |= flow->tcp_flags;
561 flown->size += flow->size;
562 flown->pkts += flow->pkts;
563 if (flow->flags & FLOW_FRAG) {
564 /* Fragmented flow require some additional work */
565 if (flow->flags & FLOW_TL) {
568 Several packets with FLOW_TL (attack)
570 flown->sp = flow->sp;
571 flown->dp = flow->dp;
573 if (flow->flags & FLOW_LASTFRAG) {
576 Several packets with FLOW_LASTFRAG (attack)
578 flown->sizeP = flow->sizeP;
580 flown->flags |= flow->flags;
581 flown->sizeF += flow->sizeF;
582 if ((flown->flags & FLOW_LASTFRAG)
583 && (flown->sizeF >= flown->sizeP)) {
584 /* All fragments received - flow reassembled */
585 *flowpp = flown->next;
586 pthread_mutex_unlock(&flows_mutex[h]);
587 #if ((DEBUG) & DEBUG_I)
592 flown->flags &= ~FLOW_FRAG;
593 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
596 ret = put_into(flown, MOVE_INTO
597 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
603 if (flag == MOVE_INTO) mem_free(flow);
605 pthread_mutex_unlock(&flows_mutex[h]);
609 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
613 for (i = 0; i < fields; i++) {
614 #if ((DEBUG) & DEBUG_F)
615 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
618 case NETFLOW_IPV4_SRC_ADDR:
619 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
620 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
623 case NETFLOW_IPV4_DST_ADDR:
624 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
625 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
626 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
628 p += NETFLOW_IPV4_DST_ADDR_SIZE;
631 case NETFLOW_INPUT_SNMP:
632 *((uint16_t *) p) = htons(flow->iif);
633 p += NETFLOW_INPUT_SNMP_SIZE;
636 case NETFLOW_OUTPUT_SNMP:
637 *((uint16_t *) p) = htons(flow->oif);
638 p += NETFLOW_OUTPUT_SNMP_SIZE;
641 case NETFLOW_PKTS_32:
642 *((uint32_t *) p) = htonl(flow->pkts);
643 p += NETFLOW_PKTS_32_SIZE;
646 case NETFLOW_BYTES_32:
647 *((uint32_t *) p) = htonl(flow->size);
648 p += NETFLOW_BYTES_32_SIZE;
651 case NETFLOW_FIRST_SWITCHED:
652 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
653 p += NETFLOW_FIRST_SWITCHED_SIZE;
656 case NETFLOW_LAST_SWITCHED:
657 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
658 p += NETFLOW_LAST_SWITCHED_SIZE;
661 case NETFLOW_L4_SRC_PORT:
662 *((uint16_t *) p) = flow->sp;
663 p += NETFLOW_L4_SRC_PORT_SIZE;
666 case NETFLOW_L4_DST_PORT:
667 *((uint16_t *) p) = flow->dp;
668 p += NETFLOW_L4_DST_PORT_SIZE;
672 *((uint8_t *) p) = flow->proto;
673 p += NETFLOW_PROT_SIZE;
676 case NETFLOW_SRC_TOS:
677 *((uint8_t *) p) = flow->tos;
678 p += NETFLOW_SRC_TOS_SIZE;
681 case NETFLOW_TCP_FLAGS:
682 *((uint8_t *) p) = flow->tcp_flags;
683 p += NETFLOW_TCP_FLAGS_SIZE;
686 case NETFLOW_VERSION:
687 *((uint16_t *) p) = htons(netflow->Version);
688 p += NETFLOW_VERSION_SIZE;
692 *((uint16_t *) p) = htons(emit_count);
693 p += NETFLOW_COUNT_SIZE;
697 *((uint32_t *) p) = htonl(getuptime(&emit_time));
698 p += NETFLOW_UPTIME_SIZE;
701 case NETFLOW_UNIX_SECS:
702 *((uint32_t *) p) = htonl(emit_time.sec);
703 p += NETFLOW_UNIX_SECS_SIZE;
706 case NETFLOW_UNIX_NSECS:
707 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
708 p += NETFLOW_UNIX_NSECS_SIZE;
711 case NETFLOW_FLOW_SEQUENCE:
712 //*((uint32_t *) p) = htonl(emit_sequence);
713 *((uint32_t *) p) = 0;
714 p += NETFLOW_FLOW_SEQUENCE_SIZE;
718 /* Unsupported (uint8_t) */
719 case NETFLOW_ENGINE_TYPE:
720 case NETFLOW_ENGINE_ID:
721 case NETFLOW_FLAGS7_1:
722 case NETFLOW_SRC_MASK:
723 case NETFLOW_DST_MASK:
724 *((uint8_t *) p) = 0;
725 p += NETFLOW_PAD8_SIZE;
728 *((uint16_t *) p) = flow->tos;
729 p += NETFLOW_XID_SIZE;
732 /* Unsupported (uint16_t) */
735 case NETFLOW_FLAGS7_2:
736 *((uint16_t *) p) = 0;
737 p += NETFLOW_PAD16_SIZE;
741 /* Unsupported (uint32_t) */
742 case NETFLOW_IPV4_NEXT_HOP:
743 case NETFLOW_ROUTER_SC:
744 *((uint32_t *) p) = 0;
745 p += NETFLOW_PAD32_SIZE;
749 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
750 format, i, format[i]);
754 #if ((DEBUG) & DEBUG_F)
755 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
762 Workaround for clone()-based threads
763 Try to change EUID independently of main thread
767 setregid(pw->pw_gid, pw->pw_gid);
768 setreuid(pw->pw_uid, pw->pw_uid);
777 struct timespec timeout;
778 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
780 p = (void *) &emit_packet + netflow->HeaderSize;
786 pthread_mutex_lock(&emit_mutex);
787 while (!flows_emit) {
788 gettimeofday(&now, 0);
789 timeout.tv_sec = now.tv_sec + emit_timeout;
790 /* Do not wait until emit_packet will filled - it may be too long */
791 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
792 pthread_mutex_unlock(&emit_mutex);
797 flows_emit = flows_emit->next;
798 #if ((DEBUG) & DEBUG_I)
801 pthread_mutex_unlock(&emit_mutex);
805 gettime(&start_time);
806 start_time.sec -= start_time_offset;
809 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
813 printf("Emit count = %d\n", emit_count);
816 if (emit_count == netflow->MaxFlows) {
819 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
820 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
821 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
822 #ifdef STD_NETFLOW_PDU
823 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
826 for (i = 0; i < npeers; i++) {
827 if (peers[i].type == PEER_FILE) {
828 if (netflow->SeqOffset)
829 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
830 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
831 ret = write(peers[i].write_fd, emit_packet, size);
834 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
835 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
836 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
840 #if ((DEBUG) & DEBUG_E)
842 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
843 emit_count, i + 1, peers[i].seq);
846 peers[i].seq += emit_count;
849 if (emit_rate_bytes) {
851 delay = sent / emit_rate_bytes;
853 sent %= emit_rate_bytes;
855 timeout.tv_nsec = emit_rate_delay * delay;
856 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
861 if (peers[i].type == PEER_MIRROR) goto sendreal;
863 if (peers[i].type == PEER_ROTATE)
864 if (peer_rot_cur++ == peer_rot_work) {
866 if (netflow->SeqOffset)
867 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
868 ret = send(peers[i].write_fd, emit_packet, size, 0);
870 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
871 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
872 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
875 #if ((DEBUG) & DEBUG_E)
877 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
878 emit_count, i + 1, peers[i].seq);
881 peers[i].seq += emit_count;
884 if (emit_rate_bytes) {
886 delay = sent / emit_rate_bytes;
888 sent %= emit_rate_bytes;
890 timeout.tv_nsec = emit_rate_delay * delay;
891 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
896 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
897 emit_sequence += emit_count;
899 #if ((DEBUG) & DEBUG_I)
906 void *unpending_thread()
909 struct timespec timeout;
910 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
917 pthread_mutex_lock(&unpending_mutex);
920 while (!(pending_tail->flags & FLOW_PENDING)) {
921 gettimeofday(&now, 0);
922 timeout.tv_sec = now.tv_sec + unpending_timeout;
923 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
926 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
929 if (put_into(pending_tail, COPY_INTO
930 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
934 #if ((DEBUG) & DEBUG_I)
935 pkts_lost_unpending++;
939 #if ((DEBUG) & DEBUG_U)
940 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
943 pending_tail->flags = 0;
944 pending_tail = pending_tail->next;
945 #if ((DEBUG) & DEBUG_I)
953 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
957 struct Flow *flow, **flowpp;
959 struct timespec timeout;
964 pthread_mutex_lock(&scan_mutex);
968 timeout.tv_sec = now.sec + scan_interval;
969 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
972 #if ((DEBUG) & DEBUG_S)
973 my_log(LOG_DEBUG, "S: %d", now.sec);
975 for (i = 0; i < 1 << HASH_BITS ; i++) {
976 pthread_mutex_lock(&flows_mutex[i]);
980 if (flow->flags & FLOW_FRAG) {
981 /* Process fragmented flow */
982 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
983 /* Fragmented flow expired - put it into special chain */
984 #if ((DEBUG) & DEBUG_I)
988 *flowpp = flow->next;
990 flow->flags &= ~FLOW_FRAG;
991 flow->next = scan_frag_dreg;
992 scan_frag_dreg = flow;
997 /* Flow is not frgamented */
998 if ((now.sec - flow->mtime.sec) > inactive_lifetime
999 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1001 #if ((DEBUG) & DEBUG_S)
1002 my_log(LOG_DEBUG, "S: E %x", flow);
1004 #if ((DEBUG) & DEBUG_I)
1007 *flowpp = flow->next;
1008 pthread_mutex_lock(&emit_mutex);
1009 flow->next = flows_emit;
1011 #if ((DEBUG) & DEBUG_I)
1014 pthread_mutex_unlock(&emit_mutex);
1019 flowpp = &flow->next;
1022 pthread_mutex_unlock(&flows_mutex[i]);
1024 if (flows_emit) pthread_cond_signal(&emit_cond);
1026 while (scan_frag_dreg) {
1027 flow = scan_frag_dreg;
1028 scan_frag_dreg = flow->next;
1029 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1032 put_into(flow, MOVE_INTO
1033 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1037 #if ((DEBUG) & DEBUG_S)
1038 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1046 struct ulog_packet_msg *ulog_msg;
1050 int len, off_frag, psize;
1051 #if ((DEBUG) & DEBUG_C)
1059 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1061 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1064 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1066 #if ((DEBUG) & DEBUG_C)
1067 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1070 nl = (void *) &ulog_msg->payload;
1071 psize = ulog_msg->data_len;
1074 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1075 #if ((DEBUG) & DEBUG_C)
1076 strcat(logbuf, " U");
1077 my_log(LOG_DEBUG, "%s", logbuf);
1079 #if ((DEBUG) & DEBUG_I)
1085 if (pending_head->flags) {
1086 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1088 # if ((DEBUG) & DEBUG_C)
1093 "pending queue full:", "packet lost");
1095 #if ((DEBUG) & DEBUG_I)
1096 pkts_lost_capture++;
1101 #if ((DEBUG) & DEBUG_I)
1105 flow = pending_head;
1107 /* ?FIXME? Add sanity check for ip_len? */
1108 flow->size = ntohs(nl->ip_len);
1109 #if ((DEBUG) & DEBUG_I)
1110 size_total += flow->size;
1113 flow->sip = nl->ip_src;
1114 flow->dip = nl->ip_dst;
1115 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1116 if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1117 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
1119 flow->iif = snmp_index(ulog_msg->indev_name);
1120 flow->oif = snmp_index(ulog_msg->outdev_name);
1121 flow->proto = nl->ip_p;
1123 flow->tcp_flags = 0;
1127 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1128 if (ulog_msg->timestamp_sec) {
1129 flow->ctime.sec = ulog_msg->timestamp_sec;
1130 flow->ctime.usec = ulog_msg->timestamp_usec;
1131 } else gettime(&flow->ctime);
1132 flow->mtime = flow->ctime;
1134 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1137 Offset (from network layer) to transport layer header/IP data
1138 IOW IP header size ;-)
1141 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1143 off_tl = nl->ip_hl << 2;
1144 tl = (void *) nl + off_tl;
1146 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1147 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1149 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1150 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1152 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1153 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1154 #if ((DEBUG) & DEBUG_C)
1155 strcat(logbuf, " F");
1157 #if ((DEBUG) & DEBUG_I)
1158 pkts_total_fragmented++;
1160 flow->flags |= FLOW_FRAG;
1161 flow->id = nl->ip_id;
1163 if (!(ntohs(nl->ip_off) & IP_MF)) {
1164 /* Packet whith IP_MF contains information about whole datagram size */
1165 flow->flags |= FLOW_LASTFRAG;
1166 /* size = frag_offset*8 + data_size */
1167 flow->sizeP = off_frag + flow->sizeF;
1171 #if ((DEBUG) & DEBUG_C)
1172 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1173 strcat(logbuf, buf);
1174 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1175 strcat(logbuf, buf);
1179 Fortunately most interesting transport layer information fit
1180 into first 8 bytes of IP data field (minimal nonzero size).
1181 Thus we don't need actual packet reassembling to build whole
1182 transport layer data. We only check the fragment offset for
1183 zero value to find packet with this information.
1185 if (!off_frag && psize >= 8) {
1186 switch (flow->proto) {
1189 flow->sp = ((struct udphdr *)tl)->uh_sport;
1190 flow->dp = ((struct udphdr *)tl)->uh_dport;
1195 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1196 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1199 #ifdef ICMP_TRICK_CISCO
1201 flow->dp = *((int32_t *) tl);
1206 /* Unknown transport layer */
1207 #if ((DEBUG) & DEBUG_C)
1208 strcat(logbuf, " U");
1215 #if ((DEBUG) & DEBUG_C)
1216 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1217 strcat(logbuf, buf);
1219 flow->flags |= FLOW_TL;
1223 /* Check for tcp flags presence (including CWR and ECE). */
1224 if (flow->proto == IPPROTO_TCP
1226 && psize >= 16 - off_frag) {
1227 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1228 #if ((DEBUG) & DEBUG_C)
1229 sprintf(buf, " TCP:%x", flow->tcp_flags);
1230 strcat(logbuf, buf);
1234 #if ((DEBUG) & DEBUG_C)
1235 sprintf(buf, " => %x", (unsigned) flow);
1236 strcat(logbuf, buf);
1237 my_log(LOG_DEBUG, "%s", logbuf);
1240 #if ((DEBUG) & DEBUG_I)
1242 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1243 if (pending_queue_trace < pending_queue_trace_candidate)
1244 pending_queue_trace = pending_queue_trace_candidate;
1247 /* Flow complete - inform unpending_thread() about it */
1248 pending_head->flags |= FLOW_PENDING;
1249 pending_head = pending_head->next;
1251 pthread_cond_signal(&unpending_cond);
1257 /* Copied out of CoDemux */
1259 static int init_daemon() {
1263 pidfile = fopen(PIDFILE, "w");
1264 if (pidfile == NULL) {
1265 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1268 if ((pid = fork()) < 0) {
1270 my_log(LOG_ERR, "Could not fork!\n");
1273 else if (pid != 0) {
1274 /* i'm the parent, writing down the child pid */
1275 fprintf(pidfile, "%u\n", pid);
1280 /* close the pid file */
1283 /* routines for any daemon process
1284 1. create a new session
1285 2. change directory to the root
1286 3. change the file creation permission
1289 chdir("/var/local/fprobe");
1295 int main(int argc, char **argv)
1298 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1299 int c, i, write_fd, memory_limit = 0;
1300 struct addrinfo hints, *res;
1301 struct sockaddr_in saddr;
1302 pthread_attr_t tattr;
1303 struct sigaction sigact;
1304 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1305 struct timeval timeout;
1307 sched_min = sched_get_priority_min(SCHED);
1308 sched_max = sched_get_priority_max(SCHED);
1310 memset(&saddr, 0 , sizeof(saddr));
1311 memset(&hints, 0 , sizeof(hints));
1312 hints.ai_flags = AI_PASSIVE;
1313 hints.ai_family = AF_INET;
1314 hints.ai_socktype = SOCK_DGRAM;
1316 /* Process command line options */
1319 while ((c = my_getopt(argc, argv, parms)) != -1) {
1329 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1330 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1331 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1332 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1333 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1334 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1335 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1336 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1337 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1338 if (parms[nflag].count) {
1339 switch (atoi(parms[nflag].arg)) {
1341 netflow = &NetFlow1;
1348 netflow = &NetFlow7;
1352 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1356 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1357 if (parms[lflag].count) {
1358 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1361 sprintf(errpbuf, "[%s]", log_suffix);
1362 strcat(ident, errpbuf);
1365 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1366 if (log_suffix) *--log_suffix = ':';
1368 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1370 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1373 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1374 if (parms[qflag].count) {
1375 pending_queue_length = atoi(parms[qflag].arg);
1376 if (pending_queue_length < 1) {
1377 fprintf(stderr, "Illegal %s\n", "pending queue length");
1381 if (parms[rflag].count) {
1382 schedp.sched_priority = atoi(parms[rflag].arg);
1383 if (schedp.sched_priority
1384 && (schedp.sched_priority < sched_min
1385 || schedp.sched_priority > sched_max)) {
1386 fprintf(stderr, "Illegal %s\n", "realtime priority");
1390 if (parms[Bflag].count) {
1391 sockbufsize = atoi(parms[Bflag].arg) << 10;
1393 if (parms[bflag].count) {
1394 bulk_quantity = atoi(parms[bflag].arg);
1395 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1396 fprintf(stderr, "Illegal %s\n", "bulk size");
1400 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1401 if (parms[Xflag].count) {
1402 for(i = 0; parms[Xflag].arg[i]; i++)
1403 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1404 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1406 rule = strtok(parms[Xflag].arg, ":");
1407 for (i = 0; rule; i++) {
1408 snmp_rules[i].len = strlen(rule);
1409 if (snmp_rules[i].len > IFNAMSIZ) {
1410 fprintf(stderr, "Illegal %s\n", "interface basename");
1413 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1414 if (!*(rule - 1)) *(rule - 1) = ',';
1415 rule = strtok(NULL, ",");
1417 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1420 snmp_rules[i].base = atoi(rule);
1422 rule = strtok(NULL, ":");
1426 if (parms[tflag].count)
1427 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1428 if (parms[aflag].count) {
1429 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1431 fprintf(stderr, "Illegal %s\n", "source address");
1434 saddr = *((struct sockaddr_in *) res->ai_addr);
1438 if (parms[uflag].count)
1439 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1440 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1445 /* Process collectors parameters. Brrrr... :-[ */
1447 npeers = argc - optind;
1449 /* Send to remote Netflow collector */
1450 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1451 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1453 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1455 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1456 fprintf(stderr, "socket(): %s\n", strerror(errno));
1459 peers[npeers].write_fd = write_fd;
1460 peers[npeers].type = PEER_MIRROR;
1461 peers[npeers].laddr = saddr;
1462 peers[npeers].seq = 0;
1463 if ((lhost = strchr(dport, '/'))) {
1465 if ((type = strchr(lhost, '/'))) {
1473 peers[npeers].type = PEER_ROTATE;
1482 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1483 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1487 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1488 sizeof(struct sockaddr_in))) {
1489 fprintf(stderr, "bind(): %s\n", strerror(errno));
1492 if (getaddrinfo(dhost, dport, &hints, &res)) {
1494 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1497 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1499 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1500 sizeof(struct sockaddr_in))) {
1501 fprintf(stderr, "connect(): %s\n", strerror(errno));
1505 /* Restore command line */
1506 if (type) *--type = '/';
1507 if (lhost) *--lhost = '/';
1511 else if (parms[fflag].count) {
1513 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1514 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1515 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1517 peers[npeers].write_fd = START_VALUE;
1518 peers[npeers].type = PEER_FILE;
1519 peers[npeers].seq = 0;
1528 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1529 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1531 fprintf(stderr, "libipulog initialization error: %s",
1532 ipulog_strerror(ipulog_errno));
1536 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1537 &sockbufsize, sizeof(sockbufsize)) < 0)
1538 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1540 /* Daemonize (if log destination stdout-free) */
1542 my_log_open(ident, verbosity, log_dest);
1546 if (!(log_dest & 2)) {
1547 /* Crash-proofing - Sapan*/
1551 fprintf(stderr, "fork(): %s", strerror(errno));
1556 freopen("/dev/null", "r", stdin);
1557 freopen("/dev/null", "w", stdout);
1558 freopen("/dev/null", "w", stderr);
1562 while (wait3(NULL,0,NULL) < 1);
1566 setvbuf(stdout, (char *)0, _IONBF, 0);
1567 setvbuf(stderr, (char *)0, _IONBF, 0);
1571 sprintf(errpbuf, "[%ld]", (long) pid);
1572 strcat(ident, errpbuf);
1574 /* Initialization */
1576 hash_init(); /* Actually for crc16 only */
1577 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1578 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1581 /* Hope 12 days is enough :-/ */
1582 start_time_offset = 1 << 20;
1584 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1586 gettime(&start_time);
1589 Build static pending queue as circular buffer.
1591 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1592 pending_tail = pending_head;
1593 for (i = pending_queue_length - 1; i--;) {
1594 if (!(pending_tail->next = mem_alloc())) {
1596 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1599 pending_tail = pending_tail->next;
1601 pending_tail->next = pending_head;
1602 pending_tail = pending_head;
1604 sigemptyset(&sig_mask);
1605 sigact.sa_handler = &sighandler;
1606 sigact.sa_mask = sig_mask;
1607 sigact.sa_flags = 0;
1608 sigaddset(&sig_mask, SIGTERM);
1609 sigaction(SIGTERM, &sigact, 0);
1610 #if ((DEBUG) & DEBUG_I)
1611 sigaddset(&sig_mask, SIGUSR1);
1612 sigaction(SIGUSR1, &sigact, 0);
1614 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1615 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1619 my_log(LOG_INFO, "Starting %s...", VERSION);
1621 if (parms[cflag].count) {
1622 if (chdir(parms[cflag].arg) || chroot(".")) {
1623 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1628 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1629 pthread_attr_init(&tattr);
1630 for (i = 0; i < THREADS - 1; i++) {
1631 if (schedp.sched_priority > 0) {
1632 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1633 (pthread_attr_setschedparam(&tattr, &schedp))) {
1634 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1638 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1639 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1642 pthread_detach(thid);
1643 schedp.sched_priority++;
1647 if (setgroups(0, NULL)) {
1648 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1651 if (setregid(pw->pw_gid, pw->pw_gid)) {
1652 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1655 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1656 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1661 if (!(pidfile = fopen(pidfilepath, "w")))
1662 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1664 fprintf(pidfile, "%ld\n", (long) pid);
1668 my_log(LOG_INFO, "pid: %d", pid);
1669 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1670 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1671 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1672 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1673 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1674 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1675 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1676 for (i = 0; i < nsnmp_rules; i++) {
1677 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1678 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1680 for (i = 0; i < npeers; i++) {
1681 switch (peers[i].type) {
1689 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1690 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1691 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1694 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1696 timeout.tv_usec = 0;
1698 || (total_elements - free_elements - pending_queue_length)
1700 || pending_tail->flags) {
1703 timeout.tv_sec = scan_interval;
1704 select(0, 0, 0, 0, &timeout);
1707 if (sigs & SIGTERM_MASK && !killed) {
1708 sigs &= ~SIGTERM_MASK;
1709 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1712 active_lifetime = -1;
1713 inactive_lifetime = -1;
1715 unpending_timeout = 1;
1717 pthread_cond_signal(&scan_cond);
1718 pthread_cond_signal(&unpending_cond);
1721 #if ((DEBUG) & DEBUG_I)
1722 if (sigs & SIGUSR1_MASK) {
1723 sigs &= ~SIGUSR1_MASK;
1728 remove(pidfilepath);
1729 #if ((DEBUG) & DEBUG_I)
1732 my_log(LOG_INFO, "Done.");