2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
43 struct sockaddr_nl local;
44 struct sockaddr_nl peer;
45 struct nlmsghdr* last_nlhdr;
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
60 #include <sys/param.h>
85 #include <sys/select.h>
91 #include <fprobe-ulog.h>
93 #include <my_getopt.h>
98 #define PIDFILE "/var/log/fprobe-ulog.pid"
99 #define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
100 #define MAX_EPOCH_SIZE sizeof("32767")
101 #define STD_NETFLOW_PDU
131 static struct getopt_parms parms[] = {
132 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
147 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
154 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
155 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
160 extern int optind, opterr, optopt;
163 extern struct NetFlow NetFlow1;
164 extern struct NetFlow NetFlow5;
165 extern struct NetFlow NetFlow7;
167 #define START_DATA_FD -5
168 #define mark_is_tos parms[Mflag].count
169 static unsigned scan_interval = 5;
170 static unsigned int min_free = 0;
171 static int frag_lifetime = 30;
172 static int inactive_lifetime = 60;
173 static int active_lifetime = 300;
174 static int sockbufsize;
175 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
176 #if (MEM_BITS == 0) || (MEM_BITS == 16)
177 #define BULK_QUANTITY 10000
179 #define BULK_QUANTITY 200
182 static unsigned epoch_length=60, log_epochs=1;
183 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
185 static unsigned bulk_quantity = BULK_QUANTITY;
186 static unsigned pending_queue_length = 100;
187 static struct NetFlow *netflow = &NetFlow5;
188 static unsigned verbosity = 6;
189 static unsigned log_dest = MY_LOG_SYSLOG;
190 static struct Time start_time;
191 static long start_time_offset;
194 extern unsigned total_elements;
195 extern unsigned free_elements;
196 extern unsigned total_memory;
197 #if ((DEBUG) & DEBUG_I)
198 static unsigned emit_pkts, emit_queue;
199 static uint64_t size_total;
200 static unsigned pkts_total, pkts_total_fragmented;
201 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
202 static unsigned pkts_pending, pkts_pending_done;
203 static unsigned pending_queue_trace, pending_queue_trace_candidate;
204 static unsigned flows_total, flows_fragmented;
206 static unsigned emit_count;
207 static uint32_t emit_sequence;
208 static unsigned emit_rate_bytes, emit_rate_delay;
209 static struct Time emit_time;
210 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
211 static pthread_t thid;
212 static sigset_t sig_mask;
213 static struct sched_param schedp;
214 static int sched_min, sched_max;
215 static int npeers, npeers_rot;
216 static struct peer *peers;
219 static struct Flow *flows[1 << HASH_BITS];
220 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
222 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
225 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
227 static struct Flow *pending_head, *pending_tail;
228 static struct Flow *scan_frag_dreg;
230 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
231 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
232 static struct Flow *flows_emit;
234 static char ident[256] = "fprobe-ulog";
235 static FILE *pidfile;
236 static char *pidfilepath;
239 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
240 static struct ipulog_handle *ulog_handle;
241 static uint32_t ulog_gmask = 1;
242 static char *cap_buf;
243 static int nsnmp_rules;
244 static struct snmp_rule *snmp_rules;
245 static struct passwd *pw = 0;
250 "fprobe-ulog: a NetFlow probe. Version %s\n"
251 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
253 "-h\t\tDisplay this help\n"
254 "-U <mask>\tULOG group bitwise mask [1]\n"
255 "-s <seconds>\tHow often scan for expired flows [5]\n"
256 "-g <seconds>\tFragmented flow lifetime [30]\n"
257 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
258 "-f <filename>\tLog flow data in a file\n"
259 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
260 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
261 "-a <address>\tUse <address> as source for NetFlow flow\n"
262 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
263 "-M\t\tUse netfilter mark value as ToS flag\n"
264 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
265 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
266 "-q <flows>\tPending queue length [100]\n"
267 "-B <kilobytes>\tKernel capture buffer size [0]\n"
268 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
269 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
270 "-c <directory>\tDirectory to chroot to\n"
271 "-u <user>\tUser to run as\n"
272 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
273 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
274 "-y <remote:port>\tAddress of the NetFlow collector\n"
275 "-f <writable file>\tFile to write data into\n"
276 "-T <n>\tRotate log file every n epochs\n"
277 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
278 "-E <[1..60]>\tSize of an epoch in minutes\n"
279 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
281 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
285 #if ((DEBUG) & DEBUG_I)
288 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
289 pkts_total, pkts_total_fragmented, size_total,
290 pkts_pending - pkts_pending_done, pending_queue_trace);
291 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
292 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
293 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
294 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
295 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
296 total_elements, free_elements, total_memory);
300 void sighandler(int sig)
304 sigs |= SIGTERM_MASK;
306 #if ((DEBUG) & DEBUG_I)
308 sigs |= SIGUSR1_MASK;
314 void gettime(struct Time *now)
320 now->usec = t.tv_usec;
324 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
326 return (t1->sec - t2->sec)/60;
329 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
331 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
334 /* Uptime in miliseconds */
335 uint32_t getuptime(struct Time *t)
337 /* Maximum uptime is about 49/2 days */
338 return cmpmtime(t, &start_time);
341 /* Uptime in minutes */
342 uint32_t getuptime_minutes(struct Time *t)
344 /* Maximum uptime is about 49/2 days */
345 return cmpMtime(t, &start_time);
348 hash_t hash_flow(struct Flow *flow)
350 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
351 else return hash(flow, sizeof(struct Flow_TL));
354 uint16_t snmp_index(char *name) {
357 if (!*name) return 0;
359 for (i = 0; (int) i < nsnmp_rules; i++) {
360 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
361 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
364 if ((i = if_nametoindex(name))) return i;
369 inline void copy_flow(struct Flow *src, struct Flow *dst)
376 dst->proto = src->proto;
377 dst->tcp_flags = src->tcp_flags;
381 dst->pkts = src->pkts;
382 dst->size = src->size;
383 dst->sizeF = src->sizeF;
384 dst->sizeP = src->sizeP;
385 dst->ctime = src->ctime;
386 dst->mtime = src->mtime;
387 dst->flags = src->flags;
390 void read_cur_epoch() {
392 /* Reset to -1 in case the read fails */
394 fd = open(LAST_EPOCH_FILE, O_RDONLY);
396 char snum[MAX_EPOCH_SIZE];
398 len = read(fd, snum, MAX_EPOCH_SIZE-1);
401 sscanf(snum,"%d",&cur_epoch);
402 cur_epoch++; /* Let's not stone the last epoch */
410 /* Dumps the current epoch in a file to cope with
411 * reboots and killings of fprobe */
413 void update_cur_epoch_file(int n) {
415 char snum[MAX_EPOCH_SIZE];
416 len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
417 fd = open(LAST_EPOCH_FILE, O_WRONLY|O_CREAT|O_TRUNC);
419 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
422 write(fd, snum, len);
426 /* Get the file descriptor corresponding to the current file.
427 * The kludgy implementation is to abstract away the 'current
428 * file descriptor', which may also be a socket.
431 unsigned get_data_file_fd(char *fname, int cur_fd) {
435 struct statfs statfs;
438 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
439 * doesn't solve the problem */
441 cur_uptime = getuptime_minutes(&now);
443 if (cur_fd != START_DATA_FD) {
444 if (fstatfs(cur_fd, &statfs) == -1) {
445 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
448 if (min_free && (statfs.f_bavail < min_free)
449 && (cur_epoch==last_peak))
451 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
456 assume that we can reclaim space by overwriting our own files
457 and that the difference in size will not fill the disk - sapan
462 /* If epoch length has been exceeded,
463 * or we're starting up
464 * or we're going back to the first epoch */
465 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
466 char nextname[MAX_PATH_LEN];
468 prev_uptime = cur_uptime;
469 cur_epoch = (cur_epoch + 1) % log_epochs;
470 if (cur_epoch>last_peak) last_peak = cur_epoch;
473 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
474 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
475 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
478 update_cur_epoch_file(cur_epoch);
487 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
489 struct Flow **flowpp;
495 if (prev) flowpp = *prev;
498 if (where->sip.s_addr == what->sip.s_addr
499 && where->dip.s_addr == what->dip.s_addr
500 && where->proto == what->proto) {
501 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
503 /* Both unfragmented */
504 if ((what->sp == where->sp)
505 && (what->dp == where->dp)) goto done;
508 /* Both fragmented */
509 if (where->id == what->id) goto done;
513 flowpp = &where->next;
517 if (prev) *prev = flowpp;
521 int put_into(struct Flow *flow, int flag
522 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
529 struct Flow *flown, **flowpp;
530 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
535 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
536 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
539 pthread_mutex_lock(&flows_mutex[h]);
541 if (!(flown = find(flows[h], flow, &flowpp))) {
542 /* No suitable flow found - add */
543 if (flag == COPY_INTO) {
544 if ((flown = mem_alloc())) {
545 copy_flow(flow, flown);
548 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
549 my_log(LOG_ERR, "%s %s. %s",
550 "mem_alloc():", strerror(errno), "packet lost");
555 flow->next = flows[h];
557 #if ((DEBUG) & DEBUG_I)
559 if (flow->flags & FLOW_FRAG) flows_fragmented++;
561 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
563 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
568 /* Found suitable flow - update */
569 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
570 sprintf(buf, " +> %x", (unsigned) flown);
573 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
574 flown->mtime = flow->mtime;
575 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
576 flown->ctime = flow->ctime;
577 flown->tcp_flags |= flow->tcp_flags;
578 flown->size += flow->size;
579 flown->pkts += flow->pkts;
580 if (flow->flags & FLOW_FRAG) {
581 /* Fragmented flow require some additional work */
582 if (flow->flags & FLOW_TL) {
585 Several packets with FLOW_TL (attack)
587 flown->sp = flow->sp;
588 flown->dp = flow->dp;
590 if (flow->flags & FLOW_LASTFRAG) {
593 Several packets with FLOW_LASTFRAG (attack)
595 flown->sizeP = flow->sizeP;
597 flown->flags |= flow->flags;
598 flown->sizeF += flow->sizeF;
599 if ((flown->flags & FLOW_LASTFRAG)
600 && (flown->sizeF >= flown->sizeP)) {
601 /* All fragments received - flow reassembled */
602 *flowpp = flown->next;
603 pthread_mutex_unlock(&flows_mutex[h]);
604 #if ((DEBUG) & DEBUG_I)
609 flown->flags &= ~FLOW_FRAG;
610 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
613 ret = put_into(flown, MOVE_INTO
614 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
620 if (flag == MOVE_INTO) mem_free(flow);
622 pthread_mutex_unlock(&flows_mutex[h]);
626 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
630 for (i = 0; i < fields; i++) {
631 #if ((DEBUG) & DEBUG_F)
632 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
635 case NETFLOW_IPV4_SRC_ADDR:
636 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
637 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
640 case NETFLOW_IPV4_DST_ADDR:
641 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
642 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
643 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
645 p += NETFLOW_IPV4_DST_ADDR_SIZE;
648 case NETFLOW_INPUT_SNMP:
649 *((uint16_t *) p) = htons(flow->iif);
650 p += NETFLOW_INPUT_SNMP_SIZE;
653 case NETFLOW_OUTPUT_SNMP:
654 *((uint16_t *) p) = htons(flow->oif);
655 p += NETFLOW_OUTPUT_SNMP_SIZE;
658 case NETFLOW_PKTS_32:
659 *((uint32_t *) p) = htonl(flow->pkts);
660 p += NETFLOW_PKTS_32_SIZE;
663 case NETFLOW_BYTES_32:
664 *((uint32_t *) p) = htonl(flow->size);
665 p += NETFLOW_BYTES_32_SIZE;
668 case NETFLOW_FIRST_SWITCHED:
669 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
670 p += NETFLOW_FIRST_SWITCHED_SIZE;
673 case NETFLOW_LAST_SWITCHED:
674 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
675 p += NETFLOW_LAST_SWITCHED_SIZE;
678 case NETFLOW_L4_SRC_PORT:
679 *((uint16_t *) p) = flow->sp;
680 p += NETFLOW_L4_SRC_PORT_SIZE;
683 case NETFLOW_L4_DST_PORT:
684 *((uint16_t *) p) = flow->dp;
685 p += NETFLOW_L4_DST_PORT_SIZE;
689 *((uint8_t *) p) = flow->proto;
690 p += NETFLOW_PROT_SIZE;
693 case NETFLOW_SRC_TOS:
694 *((uint8_t *) p) = flow->tos;
695 p += NETFLOW_SRC_TOS_SIZE;
698 case NETFLOW_TCP_FLAGS:
699 *((uint8_t *) p) = flow->tcp_flags;
700 p += NETFLOW_TCP_FLAGS_SIZE;
703 case NETFLOW_VERSION:
704 *((uint16_t *) p) = htons(netflow->Version);
705 p += NETFLOW_VERSION_SIZE;
709 *((uint16_t *) p) = htons(emit_count);
710 p += NETFLOW_COUNT_SIZE;
714 *((uint32_t *) p) = htonl(getuptime(&emit_time));
715 p += NETFLOW_UPTIME_SIZE;
718 case NETFLOW_UNIX_SECS:
719 *((uint32_t *) p) = htonl(emit_time.sec);
720 p += NETFLOW_UNIX_SECS_SIZE;
723 case NETFLOW_UNIX_NSECS:
724 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
725 p += NETFLOW_UNIX_NSECS_SIZE;
728 case NETFLOW_FLOW_SEQUENCE:
729 //*((uint32_t *) p) = htonl(emit_sequence);
730 *((uint32_t *) p) = 0;
731 p += NETFLOW_FLOW_SEQUENCE_SIZE;
735 /* Unsupported (uint8_t) */
736 case NETFLOW_ENGINE_TYPE:
737 case NETFLOW_ENGINE_ID:
738 case NETFLOW_FLAGS7_1:
739 case NETFLOW_SRC_MASK:
740 case NETFLOW_DST_MASK:
741 *((uint8_t *) p) = 0;
742 p += NETFLOW_PAD8_SIZE;
745 *((uint16_t *) p) = flow->tos;
746 p += NETFLOW_XID_SIZE;
749 /* Unsupported (uint16_t) */
752 case NETFLOW_FLAGS7_2:
753 *((uint16_t *) p) = 0;
754 p += NETFLOW_PAD16_SIZE;
758 /* Unsupported (uint32_t) */
759 case NETFLOW_IPV4_NEXT_HOP:
760 case NETFLOW_ROUTER_SC:
761 *((uint32_t *) p) = 0;
762 p += NETFLOW_PAD32_SIZE;
766 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
767 format, i, format[i]);
771 #if ((DEBUG) & DEBUG_F)
772 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
779 Workaround for clone()-based threads
780 Try to change EUID independently of main thread
784 setregid(pw->pw_gid, pw->pw_gid);
785 setreuid(pw->pw_uid, pw->pw_uid);
794 struct timespec timeout;
795 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
797 p = (void *) &emit_packet + netflow->HeaderSize;
803 pthread_mutex_lock(&emit_mutex);
804 while (!flows_emit) {
805 gettimeofday(&now, 0);
806 timeout.tv_sec = now.tv_sec + emit_timeout;
807 /* Do not wait until emit_packet will filled - it may be too long */
808 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
809 pthread_mutex_unlock(&emit_mutex);
814 flows_emit = flows_emit->next;
815 #if ((DEBUG) & DEBUG_I)
818 pthread_mutex_unlock(&emit_mutex);
822 gettime(&start_time);
823 start_time.sec -= start_time_offset;
826 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
830 printf("Emit count = %d\n", emit_count);
833 if (emit_count == netflow->MaxFlows) {
836 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
837 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
838 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
839 #ifdef STD_NETFLOW_PDU
840 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
843 for (i = 0; i < npeers; i++) {
844 if (peers[i].type == PEER_FILE) {
845 if (netflow->SeqOffset)
846 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
847 peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
848 ret = write(peers[i].write_fd, emit_packet, size);
851 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
852 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
853 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
857 #if ((DEBUG) & DEBUG_E)
859 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
860 emit_count, i + 1, peers[i].seq);
863 peers[i].seq += emit_count;
866 if (emit_rate_bytes) {
868 delay = sent / emit_rate_bytes;
870 sent %= emit_rate_bytes;
872 timeout.tv_nsec = emit_rate_delay * delay;
873 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
878 if (peers[i].type == PEER_MIRROR) goto sendreal;
880 if (peers[i].type == PEER_ROTATE)
881 if (peer_rot_cur++ == peer_rot_work) {
883 if (netflow->SeqOffset)
884 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
885 ret = send(peers[i].write_fd, emit_packet, size, 0);
887 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
888 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
889 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
892 #if ((DEBUG) & DEBUG_E)
894 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
895 emit_count, i + 1, peers[i].seq);
898 peers[i].seq += emit_count;
901 if (emit_rate_bytes) {
903 delay = sent / emit_rate_bytes;
905 sent %= emit_rate_bytes;
907 timeout.tv_nsec = emit_rate_delay * delay;
908 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
913 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
914 emit_sequence += emit_count;
916 #if ((DEBUG) & DEBUG_I)
923 void *unpending_thread()
926 struct timespec timeout;
927 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
934 pthread_mutex_lock(&unpending_mutex);
937 while (!(pending_tail->flags & FLOW_PENDING)) {
938 gettimeofday(&now, 0);
939 timeout.tv_sec = now.tv_sec + unpending_timeout;
940 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
943 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
946 if (put_into(pending_tail, COPY_INTO
947 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
951 #if ((DEBUG) & DEBUG_I)
952 pkts_lost_unpending++;
956 #if ((DEBUG) & DEBUG_U)
957 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
960 pending_tail->flags = 0;
961 pending_tail = pending_tail->next;
962 #if ((DEBUG) & DEBUG_I)
970 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
974 struct Flow *flow, **flowpp;
976 struct timespec timeout;
981 pthread_mutex_lock(&scan_mutex);
985 timeout.tv_sec = now.sec + scan_interval;
986 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
989 #if ((DEBUG) & DEBUG_S)
990 my_log(LOG_DEBUG, "S: %d", now.sec);
992 for (i = 0; i < 1 << HASH_BITS ; i++) {
993 pthread_mutex_lock(&flows_mutex[i]);
997 if (flow->flags & FLOW_FRAG) {
998 /* Process fragmented flow */
999 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1000 /* Fragmented flow expired - put it into special chain */
1001 #if ((DEBUG) & DEBUG_I)
1005 *flowpp = flow->next;
1007 flow->flags &= ~FLOW_FRAG;
1008 flow->next = scan_frag_dreg;
1009 scan_frag_dreg = flow;
1014 /* Flow is not frgamented */
1015 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1016 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1018 #if ((DEBUG) & DEBUG_S)
1019 my_log(LOG_DEBUG, "S: E %x", flow);
1021 #if ((DEBUG) & DEBUG_I)
1024 *flowpp = flow->next;
1025 pthread_mutex_lock(&emit_mutex);
1026 flow->next = flows_emit;
1028 #if ((DEBUG) & DEBUG_I)
1031 pthread_mutex_unlock(&emit_mutex);
1036 flowpp = &flow->next;
1039 pthread_mutex_unlock(&flows_mutex[i]);
1041 if (flows_emit) pthread_cond_signal(&emit_cond);
1043 while (scan_frag_dreg) {
1044 flow = scan_frag_dreg;
1045 scan_frag_dreg = flow->next;
1046 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1049 put_into(flow, MOVE_INTO
1050 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1054 #if ((DEBUG) & DEBUG_S)
1055 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1063 struct ulog_packet_msg *ulog_msg;
1067 int len, off_frag, psize;
1068 #if ((DEBUG) & DEBUG_C)
1076 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1078 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1081 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1083 #if ((DEBUG) & DEBUG_C)
1084 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1087 nl = (void *) &ulog_msg->payload;
1088 psize = ulog_msg->data_len;
1091 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1092 #if ((DEBUG) & DEBUG_C)
1093 strcat(logbuf, " U");
1094 my_log(LOG_DEBUG, "%s", logbuf);
1096 #if ((DEBUG) & DEBUG_I)
1102 if (pending_head->flags) {
1103 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1105 # if ((DEBUG) & DEBUG_C)
1110 "pending queue full:", "packet lost");
1112 #if ((DEBUG) & DEBUG_I)
1113 pkts_lost_capture++;
1118 #if ((DEBUG) & DEBUG_I)
1122 flow = pending_head;
1124 /* ?FIXME? Add sanity check for ip_len? */
1125 flow->size = ntohs(nl->ip_len);
1126 #if ((DEBUG) & DEBUG_I)
1127 size_total += flow->size;
1130 flow->sip = nl->ip_src;
1131 flow->dip = nl->ip_dst;
1132 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1133 if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1134 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
1136 flow->iif = snmp_index(ulog_msg->indev_name);
1137 flow->oif = snmp_index(ulog_msg->outdev_name);
1138 flow->proto = nl->ip_p;
1140 flow->tcp_flags = 0;
1144 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1145 if (ulog_msg->timestamp_sec) {
1146 flow->ctime.sec = ulog_msg->timestamp_sec;
1147 flow->ctime.usec = ulog_msg->timestamp_usec;
1148 } else gettime(&flow->ctime);
1149 flow->mtime = flow->ctime;
1151 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1154 Offset (from network layer) to transport layer header/IP data
1155 IOW IP header size ;-)
1158 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1160 off_tl = nl->ip_hl << 2;
1161 tl = (void *) nl + off_tl;
1163 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1164 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1166 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1167 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1169 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1170 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1171 #if ((DEBUG) & DEBUG_C)
1172 strcat(logbuf, " F");
1174 #if ((DEBUG) & DEBUG_I)
1175 pkts_total_fragmented++;
1177 flow->flags |= FLOW_FRAG;
1178 flow->id = nl->ip_id;
1180 if (!(ntohs(nl->ip_off) & IP_MF)) {
1181 /* Packet whith IP_MF contains information about whole datagram size */
1182 flow->flags |= FLOW_LASTFRAG;
1183 /* size = frag_offset*8 + data_size */
1184 flow->sizeP = off_frag + flow->sizeF;
1188 #if ((DEBUG) & DEBUG_C)
1189 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1190 strcat(logbuf, buf);
1191 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1192 strcat(logbuf, buf);
1196 Fortunately most interesting transport layer information fit
1197 into first 8 bytes of IP data field (minimal nonzero size).
1198 Thus we don't need actual packet reassembling to build whole
1199 transport layer data. We only check the fragment offset for
1200 zero value to find packet with this information.
1202 if (!off_frag && psize >= 8) {
1203 switch (flow->proto) {
1206 flow->sp = ((struct udphdr *)tl)->uh_sport;
1207 flow->dp = ((struct udphdr *)tl)->uh_dport;
1212 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1213 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1216 #ifdef ICMP_TRICK_CISCO
1218 flow->dp = *((int32_t *) tl);
1223 /* Unknown transport layer */
1224 #if ((DEBUG) & DEBUG_C)
1225 strcat(logbuf, " U");
1232 #if ((DEBUG) & DEBUG_C)
1233 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1234 strcat(logbuf, buf);
1236 flow->flags |= FLOW_TL;
1240 /* Check for tcp flags presence (including CWR and ECE). */
1241 if (flow->proto == IPPROTO_TCP
1243 && psize >= 16 - off_frag) {
1244 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1245 #if ((DEBUG) & DEBUG_C)
1246 sprintf(buf, " TCP:%x", flow->tcp_flags);
1247 strcat(logbuf, buf);
1251 #if ((DEBUG) & DEBUG_C)
1252 sprintf(buf, " => %x", (unsigned) flow);
1253 strcat(logbuf, buf);
1254 my_log(LOG_DEBUG, "%s", logbuf);
1257 #if ((DEBUG) & DEBUG_I)
1259 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1260 if (pending_queue_trace < pending_queue_trace_candidate)
1261 pending_queue_trace = pending_queue_trace_candidate;
1264 /* Flow complete - inform unpending_thread() about it */
1265 pending_head->flags |= FLOW_PENDING;
1266 pending_head = pending_head->next;
1268 pthread_cond_signal(&unpending_cond);
1274 /* Copied out of CoDemux */
1276 static int init_daemon() {
1280 pidfile = fopen(PIDFILE, "w");
1281 if (pidfile == NULL) {
1282 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1285 if ((pid = fork()) < 0) {
1287 my_log(LOG_ERR, "Could not fork!\n");
1290 else if (pid != 0) {
1291 /* i'm the parent, writing down the child pid */
1292 fprintf(pidfile, "%u\n", pid);
1297 /* close the pid file */
1300 /* routines for any daemon process
1301 1. create a new session
1302 2. change directory to the root
1303 3. change the file creation permission
1306 chdir("/var/local/fprobe");
1312 int main(int argc, char **argv)
1315 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1316 int c, i, write_fd, memory_limit = 0;
1317 struct addrinfo hints, *res;
1318 struct sockaddr_in saddr;
1319 pthread_attr_t tattr;
1320 struct sigaction sigact;
1321 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1322 struct timeval timeout;
1324 sched_min = sched_get_priority_min(SCHED);
1325 sched_max = sched_get_priority_max(SCHED);
1327 memset(&saddr, 0 , sizeof(saddr));
1328 memset(&hints, 0 , sizeof(hints));
1329 hints.ai_flags = AI_PASSIVE;
1330 hints.ai_family = AF_INET;
1331 hints.ai_socktype = SOCK_DGRAM;
1333 /* Process command line options */
1336 while ((c = my_getopt(argc, argv, parms)) != -1) {
1346 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1347 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1348 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1349 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1350 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1351 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1352 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1353 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1354 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1355 if (parms[nflag].count) {
1356 switch (atoi(parms[nflag].arg)) {
1358 netflow = &NetFlow1;
1365 netflow = &NetFlow7;
1369 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1373 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1374 if (parms[lflag].count) {
1375 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1378 sprintf(errpbuf, "[%s]", log_suffix);
1379 strcat(ident, errpbuf);
1382 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1383 if (log_suffix) *--log_suffix = ':';
1385 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1387 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1390 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1391 if (parms[qflag].count) {
1392 pending_queue_length = atoi(parms[qflag].arg);
1393 if (pending_queue_length < 1) {
1394 fprintf(stderr, "Illegal %s\n", "pending queue length");
1398 if (parms[rflag].count) {
1399 schedp.sched_priority = atoi(parms[rflag].arg);
1400 if (schedp.sched_priority
1401 && (schedp.sched_priority < sched_min
1402 || schedp.sched_priority > sched_max)) {
1403 fprintf(stderr, "Illegal %s\n", "realtime priority");
1407 if (parms[Bflag].count) {
1408 sockbufsize = atoi(parms[Bflag].arg) << 10;
1410 if (parms[bflag].count) {
1411 bulk_quantity = atoi(parms[bflag].arg);
1412 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1413 fprintf(stderr, "Illegal %s\n", "bulk size");
1417 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1418 if (parms[Xflag].count) {
1419 for(i = 0; parms[Xflag].arg[i]; i++)
1420 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1421 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1423 rule = strtok(parms[Xflag].arg, ":");
1424 for (i = 0; rule; i++) {
1425 snmp_rules[i].len = strlen(rule);
1426 if (snmp_rules[i].len > IFNAMSIZ) {
1427 fprintf(stderr, "Illegal %s\n", "interface basename");
1430 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1431 if (!*(rule - 1)) *(rule - 1) = ',';
1432 rule = strtok(NULL, ",");
1434 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1437 snmp_rules[i].base = atoi(rule);
1439 rule = strtok(NULL, ":");
1443 if (parms[tflag].count)
1444 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1445 if (parms[aflag].count) {
1446 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1448 fprintf(stderr, "Illegal %s\n", "source address");
1451 saddr = *((struct sockaddr_in *) res->ai_addr);
1455 if (parms[uflag].count)
1456 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1457 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1462 /* Process collectors parameters. Brrrr... :-[ */
1464 npeers = argc - optind;
1466 /* Send to remote Netflow collector */
1467 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1468 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1470 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1472 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1473 fprintf(stderr, "socket(): %s\n", strerror(errno));
1476 peers[npeers].write_fd = write_fd;
1477 peers[npeers].type = PEER_MIRROR;
1478 peers[npeers].laddr = saddr;
1479 peers[npeers].seq = 0;
1480 if ((lhost = strchr(dport, '/'))) {
1482 if ((type = strchr(lhost, '/'))) {
1490 peers[npeers].type = PEER_ROTATE;
1499 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1500 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1504 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1505 sizeof(struct sockaddr_in))) {
1506 fprintf(stderr, "bind(): %s\n", strerror(errno));
1509 if (getaddrinfo(dhost, dport, &hints, &res)) {
1511 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1514 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1516 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1517 sizeof(struct sockaddr_in))) {
1518 fprintf(stderr, "connect(): %s\n", strerror(errno));
1522 /* Restore command line */
1523 if (type) *--type = '/';
1524 if (lhost) *--lhost = '/';
1528 else if (parms[fflag].count) {
1530 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1531 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1532 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1534 peers[npeers].write_fd = START_DATA_FD;
1535 peers[npeers].type = PEER_FILE;
1536 peers[npeers].seq = 0;
1545 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1546 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1548 fprintf(stderr, "libipulog initialization error: %s",
1549 ipulog_strerror(ipulog_errno));
1553 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1554 &sockbufsize, sizeof(sockbufsize)) < 0)
1555 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1557 /* Daemonize (if log destination stdout-free) */
1559 my_log_open(ident, verbosity, log_dest);
1563 if (!(log_dest & 2)) {
1564 /* Crash-proofing - Sapan*/
1568 fprintf(stderr, "fork(): %s", strerror(errno));
1573 freopen("/dev/null", "r", stdin);
1574 freopen("/dev/null", "w", stdout);
1575 freopen("/dev/null", "w", stderr);
1579 while (wait3(NULL,0,NULL) < 1);
1583 setvbuf(stdout, (char *)0, _IONBF, 0);
1584 setvbuf(stderr, (char *)0, _IONBF, 0);
1588 sprintf(errpbuf, "[%ld]", (long) pid);
1589 strcat(ident, errpbuf);
1591 /* Initialization */
1593 hash_init(); /* Actually for crc16 only */
1594 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1595 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1598 /* Hope 12 days is enough :-/ */
1599 start_time_offset = 1 << 20;
1601 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1603 gettime(&start_time);
1606 Build static pending queue as circular buffer.
1608 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1609 pending_tail = pending_head;
1610 for (i = pending_queue_length - 1; i--;) {
1611 if (!(pending_tail->next = mem_alloc())) {
1613 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1616 pending_tail = pending_tail->next;
1618 pending_tail->next = pending_head;
1619 pending_tail = pending_head;
1621 sigemptyset(&sig_mask);
1622 sigact.sa_handler = &sighandler;
1623 sigact.sa_mask = sig_mask;
1624 sigact.sa_flags = 0;
1625 sigaddset(&sig_mask, SIGTERM);
1626 sigaction(SIGTERM, &sigact, 0);
1627 #if ((DEBUG) & DEBUG_I)
1628 sigaddset(&sig_mask, SIGUSR1);
1629 sigaction(SIGUSR1, &sigact, 0);
1631 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1632 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1636 my_log(LOG_INFO, "Starting %s...", VERSION);
1638 if (parms[cflag].count) {
1639 if (chdir(parms[cflag].arg) || chroot(".")) {
1640 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1645 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1646 pthread_attr_init(&tattr);
1647 for (i = 0; i < THREADS - 1; i++) {
1648 if (schedp.sched_priority > 0) {
1649 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1650 (pthread_attr_setschedparam(&tattr, &schedp))) {
1651 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1655 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1656 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1659 pthread_detach(thid);
1660 schedp.sched_priority++;
1664 if (setgroups(0, NULL)) {
1665 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1668 if (setregid(pw->pw_gid, pw->pw_gid)) {
1669 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1672 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1673 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1678 if (!(pidfile = fopen(pidfilepath, "w")))
1679 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1681 fprintf(pidfile, "%ld\n", (long) pid);
1685 my_log(LOG_INFO, "pid: %d", pid);
1686 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1687 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1688 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1689 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1690 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1691 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1692 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1693 for (i = 0; i < nsnmp_rules; i++) {
1694 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1695 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1697 for (i = 0; i < npeers; i++) {
1698 switch (peers[i].type) {
1706 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1707 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1708 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1711 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1713 timeout.tv_usec = 0;
1715 || (total_elements - free_elements - pending_queue_length)
1717 || pending_tail->flags) {
1720 timeout.tv_sec = scan_interval;
1721 select(0, 0, 0, 0, &timeout);
1724 if (sigs & SIGTERM_MASK && !killed) {
1725 sigs &= ~SIGTERM_MASK;
1726 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1729 active_lifetime = -1;
1730 inactive_lifetime = -1;
1732 unpending_timeout = 1;
1734 pthread_cond_signal(&scan_cond);
1735 pthread_cond_signal(&unpending_cond);
1738 #if ((DEBUG) & DEBUG_I)
1739 if (sigs & SIGUSR1_MASK) {
1740 sigs &= ~SIGUSR1_MASK;
1745 remove(pidfilepath);
1746 #if ((DEBUG) & DEBUG_I)
1749 my_log(LOG_INFO, "Done.");