2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
42 struct ipulog_handle {
45 struct sockaddr_nl local;
46 struct sockaddr_nl peer;
47 struct nlmsghdr* last_nlhdr;
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
62 #include <sys/param.h>
87 #include <sys/select.h>
93 #include <fprobe-ulog.h>
95 #include <my_getopt.h>
100 #define PIDFILE "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE sizeof("32767")
103 #define STD_NETFLOW_PDU
133 static struct getopt_parms parms[] = {
134 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
162 extern int optind, opterr, optopt;
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
181 #define BULK_QUANTITY 200
184 static unsigned epoch_length=60, log_epochs=1;
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
252 "fprobe-ulog: a NetFlow probe. Version %s\n"
253 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
255 "-h\t\tDisplay this help\n"
256 "-U <mask>\tULOG group bitwise mask [1]\n"
257 "-s <seconds>\tHow often scan for expired flows [5]\n"
258 "-g <seconds>\tFragmented flow lifetime [30]\n"
259 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260 "-f <filename>\tLog flow data in a file\n"
261 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263 "-a <address>\tUse <address> as source for NetFlow flow\n"
264 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265 "-M\t\tUse netfilter mark value as ToS flag\n"
266 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268 "-q <flows>\tPending queue length [100]\n"
269 "-B <kilobytes>\tKernel capture buffer size [0]\n"
270 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272 "-c <directory>\tDirectory to chroot to\n"
273 "-u <user>\tUser to run as\n"
274 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276 "-y <remote:port>\tAddress of the NetFlow collector\n"
277 "-f <writable file>\tFile to write data into\n"
278 "-T <n>\tRotate log file every n epochs\n"
279 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280 "-E <[1..60]>\tSize of an epoch in minutes\n"
281 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
283 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
287 #if ((DEBUG) & DEBUG_I)
290 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291 pkts_total, pkts_total_fragmented, size_total,
292 pkts_pending - pkts_pending_done, pending_queue_trace);
293 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298 total_elements, free_elements, total_memory);
302 void sighandler(int sig)
306 sigs |= SIGTERM_MASK;
308 #if ((DEBUG) & DEBUG_I)
310 sigs |= SIGUSR1_MASK;
316 void gettime(struct Time *now)
322 now->usec = t.tv_usec;
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
328 return (t1->sec - t2->sec)/60;
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
333 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
339 /* Maximum uptime is about 49/2 days */
340 return cmpmtime(t, &start_time);
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
346 /* Maximum uptime is about 49/2 days */
347 return cmpMtime(t, &start_time);
350 hash_t hash_flow(struct Flow *flow)
352 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353 else return hash(flow, sizeof(struct Flow_TL));
356 uint16_t snmp_index(char *name) {
359 if (!*name) return 0;
361 for (i = 0; (int) i < nsnmp_rules; i++) {
362 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
366 if ((i = if_nametoindex(name))) return i;
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
379 dst->proto = src->proto;
380 dst->tcp_flags = src->tcp_flags;
384 dst->pkts = src->pkts;
385 dst->size = src->size;
386 dst->sizeF = src->sizeF;
387 dst->sizeP = src->sizeP;
388 dst->ctime = src->ctime;
389 dst->mtime = src->mtime;
390 dst->flags = src->flags;
393 void read_cur_epoch() {
395 /* Reset to -1 in case the read fails */
397 fd = open(LAST_EPOCH_FILE, O_RDONLY);
399 char snum[MAX_EPOCH_SIZE];
401 len = read(fd, snum, MAX_EPOCH_SIZE-1);
404 sscanf(snum,"%d",&cur_epoch);
405 cur_epoch++; /* Let's not stone the last epoch */
413 /* Dumps the current epoch in a file to cope with
414 * reboots and killings of fprobe */
416 void update_cur_epoch_file(int n) {
418 char snum[MAX_EPOCH_SIZE];
419 len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420 fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
422 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
425 write(fd, snum, len);
429 /* Get the file descriptor corresponding to the current file.
430 * The kludgy implementation is to abstract away the 'current
431 * file descriptor', which may also be a socket.
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
438 struct statfs statfs;
441 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442 * doesn't solve the problem */
444 cur_uptime = getuptime_minutes(&now);
446 if (cur_fd != START_DATA_FD) {
447 if (fstatfs(cur_fd, &statfs) == -1) {
448 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
451 if (min_free && (statfs.f_bavail < min_free)
452 && (cur_epoch==last_peak))
454 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
459 assume that we can reclaim space by overwriting our own files
460 and that the difference in size will not fill the disk - sapan
465 /* If epoch length has been exceeded,
466 * or we're starting up
467 * or we're going back to the first epoch */
468 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469 char nextname[MAX_PATH_LEN];
471 prev_uptime = cur_uptime;
472 cur_epoch = (cur_epoch + 1) % log_epochs;
473 if (cur_epoch>last_peak) last_peak = cur_epoch;
476 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477 if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC,S_IRWXU|S_IRGRP|S_IROTH)) < 0) {
478 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
481 update_cur_epoch_file(cur_epoch);
490 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
492 struct Flow **flowpp;
498 if (prev) flowpp = *prev;
501 if (where->sip.s_addr == what->sip.s_addr
502 && where->dip.s_addr == what->dip.s_addr
503 && where->proto == what->proto) {
504 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
506 /* Both unfragmented */
507 if ((what->sp == where->sp)
508 && (what->dp == where->dp)) goto done;
511 /* Both fragmented */
512 if (where->id == what->id) goto done;
516 flowpp = &where->next;
520 if (prev) *prev = flowpp;
524 int put_into(struct Flow *flow, int flag
525 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
532 struct Flow *flown, **flowpp;
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
538 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
539 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
542 pthread_mutex_lock(&flows_mutex[h]);
544 if (!(flown = find(flows[h], flow, &flowpp))) {
545 /* No suitable flow found - add */
546 if (flag == COPY_INTO) {
547 if ((flown = mem_alloc())) {
548 copy_flow(flow, flown);
551 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
552 my_log(LOG_ERR, "%s %s. %s",
553 "mem_alloc():", strerror(errno), "packet lost");
558 flow->next = flows[h];
560 #if ((DEBUG) & DEBUG_I)
562 if (flow->flags & FLOW_FRAG) flows_fragmented++;
564 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
566 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
571 /* Found suitable flow - update */
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
573 sprintf(buf, " +> %x", (unsigned) flown);
576 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
577 flown->mtime = flow->mtime;
578 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
579 flown->ctime = flow->ctime;
580 flown->tcp_flags |= flow->tcp_flags;
581 flown->size += flow->size;
582 flown->pkts += flow->pkts;
583 if (flow->flags & FLOW_FRAG) {
584 /* Fragmented flow require some additional work */
585 if (flow->flags & FLOW_TL) {
588 Several packets with FLOW_TL (attack)
590 flown->sp = flow->sp;
591 flown->dp = flow->dp;
593 if (flow->flags & FLOW_LASTFRAG) {
596 Several packets with FLOW_LASTFRAG (attack)
598 flown->sizeP = flow->sizeP;
600 flown->flags |= flow->flags;
601 flown->sizeF += flow->sizeF;
602 if ((flown->flags & FLOW_LASTFRAG)
603 && (flown->sizeF >= flown->sizeP)) {
604 /* All fragments received - flow reassembled */
605 *flowpp = flown->next;
606 pthread_mutex_unlock(&flows_mutex[h]);
607 #if ((DEBUG) & DEBUG_I)
612 flown->flags &= ~FLOW_FRAG;
613 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
616 ret = put_into(flown, MOVE_INTO
617 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
623 if (flag == MOVE_INTO) mem_free(flow);
625 pthread_mutex_unlock(&flows_mutex[h]);
631 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
635 for (i = 0; i < fields; i++) {
636 #if ((DEBUG) & DEBUG_F)
637 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
640 case NETFLOW_IPV4_SRC_ADDR:
641 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
642 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
645 case NETFLOW_IPV4_DST_ADDR:
646 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
647 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
648 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
650 p += NETFLOW_IPV4_DST_ADDR_SIZE;
653 case NETFLOW_INPUT_SNMP:
654 *((uint16_t *) p) = htons(flow->iif);
655 p += NETFLOW_INPUT_SNMP_SIZE;
658 case NETFLOW_OUTPUT_SNMP:
659 *((uint16_t *) p) = htons(flow->oif);
660 p += NETFLOW_OUTPUT_SNMP_SIZE;
663 case NETFLOW_PKTS_32:
664 *((uint32_t *) p) = htonl(flow->pkts);
665 p += NETFLOW_PKTS_32_SIZE;
668 case NETFLOW_BYTES_32:
669 *((uint32_t *) p) = htonl(flow->size);
670 p += NETFLOW_BYTES_32_SIZE;
673 case NETFLOW_FIRST_SWITCHED:
674 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
675 p += NETFLOW_FIRST_SWITCHED_SIZE;
678 case NETFLOW_LAST_SWITCHED:
679 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
680 p += NETFLOW_LAST_SWITCHED_SIZE;
683 case NETFLOW_L4_SRC_PORT:
684 *((uint16_t *) p) = flow->sp;
685 p += NETFLOW_L4_SRC_PORT_SIZE;
688 case NETFLOW_L4_DST_PORT:
689 *((uint16_t *) p) = flow->dp;
690 p += NETFLOW_L4_DST_PORT_SIZE;
694 *((uint8_t *) p) = flow->proto;
695 p += NETFLOW_PROT_SIZE;
698 case NETFLOW_SRC_TOS:
699 *((uint8_t *) p) = flow->tos;
700 p += NETFLOW_SRC_TOS_SIZE;
703 case NETFLOW_TCP_FLAGS:
704 *((uint8_t *) p) = flow->tcp_flags;
705 p += NETFLOW_TCP_FLAGS_SIZE;
708 case NETFLOW_VERSION:
709 *((uint16_t *) p) = htons(netflow->Version);
710 p += NETFLOW_VERSION_SIZE;
714 *((uint16_t *) p) = htons(emit_count);
715 p += NETFLOW_COUNT_SIZE;
719 *((uint32_t *) p) = htonl(getuptime(&emit_time));
720 p += NETFLOW_UPTIME_SIZE;
723 case NETFLOW_UNIX_SECS:
724 *((uint32_t *) p) = htonl(emit_time.sec);
725 p += NETFLOW_UNIX_SECS_SIZE;
728 case NETFLOW_UNIX_NSECS:
729 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
730 p += NETFLOW_UNIX_NSECS_SIZE;
733 case NETFLOW_FLOW_SEQUENCE:
734 //*((uint32_t *) p) = htonl(emit_sequence);
735 *((uint32_t *) p) = 0;
736 p += NETFLOW_FLOW_SEQUENCE_SIZE;
740 /* Unsupported (uint8_t) */
741 case NETFLOW_ENGINE_TYPE:
742 case NETFLOW_ENGINE_ID:
743 case NETFLOW_FLAGS7_1:
744 case NETFLOW_SRC_MASK:
745 case NETFLOW_DST_MASK:
747 my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
750 *((uint8_t *) p) = 0;
751 p += NETFLOW_PAD8_SIZE;
754 *((uint32_t *) p) = flow->xid;
755 p += NETFLOW_XID_SIZE;
758 /* Unsupported (uint16_t) */
761 case NETFLOW_FLAGS7_2:
762 *((uint16_t *) p) = 0;
763 p += NETFLOW_PAD16_SIZE;
767 /* Unsupported (uint32_t) */
768 case NETFLOW_IPV4_NEXT_HOP:
769 case NETFLOW_ROUTER_SC:
770 *((uint32_t *) p) = 0;
771 p += NETFLOW_PAD32_SIZE;
775 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
776 format, i, format[i]);
780 #if ((DEBUG) & DEBUG_F)
781 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
788 Workaround for clone()-based threads
789 Try to change EUID independently of main thread
793 setregid(pw->pw_gid, pw->pw_gid);
794 setreuid(pw->pw_uid, pw->pw_uid);
803 struct timespec timeout;
804 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
806 p = (void *) &emit_packet + netflow->HeaderSize;
812 pthread_mutex_lock(&emit_mutex);
813 while (!flows_emit) {
814 gettimeofday(&now, 0);
815 timeout.tv_sec = now.tv_sec + emit_timeout;
816 /* Do not wait until emit_packet will filled - it may be too long */
817 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
818 pthread_mutex_unlock(&emit_mutex);
823 flows_emit = flows_emit->next;
824 #if ((DEBUG) & DEBUG_I)
827 pthread_mutex_unlock(&emit_mutex);
831 gettime(&start_time);
832 start_time.sec -= start_time_offset;
835 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
839 printf("Emit count = %d\n", emit_count);
842 if (emit_count == netflow->MaxFlows) {
845 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
846 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
847 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
848 #ifdef STD_NETFLOW_PDU
849 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
852 for (i = 0; i < npeers; i++) {
853 if (peers[i].type == PEER_FILE) {
854 if (netflow->SeqOffset)
855 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
856 peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
857 ret = write(peers[i].write_fd, emit_packet, size);
860 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
861 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
862 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
866 #if ((DEBUG) & DEBUG_E)
868 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
869 emit_count, i + 1, peers[i].seq);
872 peers[i].seq += emit_count;
875 if (emit_rate_bytes) {
877 delay = sent / emit_rate_bytes;
879 sent %= emit_rate_bytes;
881 timeout.tv_nsec = emit_rate_delay * delay;
882 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
887 if (peers[i].type == PEER_MIRROR) goto sendreal;
889 if (peers[i].type == PEER_ROTATE)
890 if (peer_rot_cur++ == peer_rot_work) {
892 if (netflow->SeqOffset)
893 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
894 ret = send(peers[i].write_fd, emit_packet, size, 0);
896 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
897 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
898 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
901 #if ((DEBUG) & DEBUG_E)
903 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
904 emit_count, i + 1, peers[i].seq);
907 peers[i].seq += emit_count;
910 if (emit_rate_bytes) {
912 delay = sent / emit_rate_bytes;
914 sent %= emit_rate_bytes;
916 timeout.tv_nsec = emit_rate_delay * delay;
917 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
922 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
923 emit_sequence += emit_count;
925 #if ((DEBUG) & DEBUG_I)
932 void *unpending_thread()
935 struct timespec timeout;
936 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
943 pthread_mutex_lock(&unpending_mutex);
946 while (!(pending_tail->flags & FLOW_PENDING)) {
947 gettimeofday(&now, 0);
948 timeout.tv_sec = now.tv_sec + unpending_timeout;
949 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
952 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
955 if (put_into(pending_tail, COPY_INTO
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
960 #if ((DEBUG) & DEBUG_I)
961 pkts_lost_unpending++;
965 #if ((DEBUG) & DEBUG_U)
966 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
969 pending_tail->flags = 0;
970 pending_tail = pending_tail->next;
971 #if ((DEBUG) & DEBUG_I)
979 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
983 struct Flow *flow, **flowpp;
985 struct timespec timeout;
990 pthread_mutex_lock(&scan_mutex);
994 timeout.tv_sec = now.sec + scan_interval;
995 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
998 #if ((DEBUG) & DEBUG_S)
999 my_log(LOG_DEBUG, "S: %d", now.sec);
1001 for (i = 0; i < 1 << HASH_BITS ; i++) {
1002 pthread_mutex_lock(&flows_mutex[i]);
1006 if (flow->flags & FLOW_FRAG) {
1007 /* Process fragmented flow */
1008 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1009 /* Fragmented flow expired - put it into special chain */
1010 #if ((DEBUG) & DEBUG_I)
1014 *flowpp = flow->next;
1016 flow->flags &= ~FLOW_FRAG;
1017 flow->next = scan_frag_dreg;
1018 scan_frag_dreg = flow;
1023 /* Flow is not frgamented */
1024 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1025 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1027 #if ((DEBUG) & DEBUG_S)
1028 my_log(LOG_DEBUG, "S: E %x", flow);
1030 #if ((DEBUG) & DEBUG_I)
1033 *flowpp = flow->next;
1034 pthread_mutex_lock(&emit_mutex);
1035 flow->next = flows_emit;
1037 #if ((DEBUG) & DEBUG_I)
1040 pthread_mutex_unlock(&emit_mutex);
1045 flowpp = &flow->next;
1048 pthread_mutex_unlock(&flows_mutex[i]);
1050 if (flows_emit) pthread_cond_signal(&emit_cond);
1052 while (scan_frag_dreg) {
1053 flow = scan_frag_dreg;
1054 scan_frag_dreg = flow->next;
1055 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1058 put_into(flow, MOVE_INTO
1059 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1063 #if ((DEBUG) & DEBUG_S)
1064 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1072 struct ulog_packet_msg *ulog_msg;
1076 int len, off_frag, psize;
1077 #if ((DEBUG) & DEBUG_C)
1086 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1088 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1091 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1093 #if ((DEBUG) & DEBUG_C)
1094 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1097 nl = (void *) &ulog_msg->payload;
1098 psize = ulog_msg->data_len;
1101 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1102 #if ((DEBUG) & DEBUG_C)
1103 strcat(logbuf, " U");
1104 my_log(LOG_DEBUG, "%s", logbuf);
1106 #if ((DEBUG) & DEBUG_I)
1112 if (pending_head->flags) {
1113 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1115 # if ((DEBUG) & DEBUG_C)
1120 "pending queue full:", "packet lost");
1122 #if ((DEBUG) & DEBUG_I)
1123 pkts_lost_capture++;
1128 #if ((DEBUG) & DEBUG_I)
1132 flow = pending_head;
1134 /* ?FIXME? Add sanity check for ip_len? */
1135 flow->size = ntohs(nl->ip_len);
1136 #if ((DEBUG) & DEBUG_I)
1137 size_total += flow->size;
1140 flow->sip = nl->ip_src;
1141 flow->dip = nl->ip_dst;
1142 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1144 /* It's going to be expensive calling this syscall on every flow.
1145 * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1146 if (ulog_msg->mark > 0) {
1147 flow->xid = get_vhi_name(ulog_msg->mark);
1148 challenge = get_vhi_name(ulog_msg->mark);
1151 if (flow->xid < 1 || flow->xid!=challenge)
1152 flow->xid = ulog_msg->mark;
1155 if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1156 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid);
1158 flow->iif = snmp_index(ulog_msg->indev_name);
1159 flow->oif = snmp_index(ulog_msg->outdev_name);
1160 flow->proto = nl->ip_p;
1162 flow->tcp_flags = 0;
1166 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1167 if (ulog_msg->timestamp_sec) {
1168 flow->ctime.sec = ulog_msg->timestamp_sec;
1169 flow->ctime.usec = ulog_msg->timestamp_usec;
1170 } else gettime(&flow->ctime);
1171 flow->mtime = flow->ctime;
1173 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1176 Offset (from network layer) to transport layer header/IP data
1177 IOW IP header size ;-)
1180 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1182 off_tl = nl->ip_hl << 2;
1183 tl = (void *) nl + off_tl;
1185 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1186 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1188 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1189 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1191 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1192 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1193 #if ((DEBUG) & DEBUG_C)
1194 strcat(logbuf, " F");
1196 #if ((DEBUG) & DEBUG_I)
1197 pkts_total_fragmented++;
1199 flow->flags |= FLOW_FRAG;
1200 flow->id = nl->ip_id;
1202 if (!(ntohs(nl->ip_off) & IP_MF)) {
1203 /* Packet whith IP_MF contains information about whole datagram size */
1204 flow->flags |= FLOW_LASTFRAG;
1205 /* size = frag_offset*8 + data_size */
1206 flow->sizeP = off_frag + flow->sizeF;
1210 #if ((DEBUG) & DEBUG_C)
1211 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1212 strcat(logbuf, buf);
1213 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1214 strcat(logbuf, buf);
1218 Fortunately most interesting transport layer information fit
1219 into first 8 bytes of IP data field (minimal nonzero size).
1220 Thus we don't need actual packet reassembling to build whole
1221 transport layer data. We only check the fragment offset for
1222 zero value to find packet with this information.
1224 if (!off_frag && psize >= 8) {
1225 switch (flow->proto) {
1228 flow->sp = ((struct udphdr *)tl)->uh_sport;
1229 flow->dp = ((struct udphdr *)tl)->uh_dport;
1234 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1235 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1238 #ifdef ICMP_TRICK_CISCO
1240 flow->dp = *((int32_t *) tl);
1245 /* Unknown transport layer */
1246 #if ((DEBUG) & DEBUG_C)
1247 strcat(logbuf, " U");
1254 #if ((DEBUG) & DEBUG_C)
1255 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1256 strcat(logbuf, buf);
1258 flow->flags |= FLOW_TL;
1262 /* Check for tcp flags presence (including CWR and ECE). */
1263 if (flow->proto == IPPROTO_TCP
1265 && psize >= 16 - off_frag) {
1266 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1267 #if ((DEBUG) & DEBUG_C)
1268 sprintf(buf, " TCP:%x", flow->tcp_flags);
1269 strcat(logbuf, buf);
1273 #if ((DEBUG) & DEBUG_C)
1274 sprintf(buf, " => %x", (unsigned) flow);
1275 strcat(logbuf, buf);
1276 my_log(LOG_DEBUG, "%s", logbuf);
1279 #if ((DEBUG) & DEBUG_I)
1281 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1282 if (pending_queue_trace < pending_queue_trace_candidate)
1283 pending_queue_trace = pending_queue_trace_candidate;
1286 /* Flow complete - inform unpending_thread() about it */
1287 pending_head->flags |= FLOW_PENDING;
1288 pending_head = pending_head->next;
1290 pthread_cond_signal(&unpending_cond);
1296 /* Copied out of CoDemux */
1298 static int init_daemon() {
1302 pidfile = fopen(PIDFILE, "w");
1303 if (pidfile == NULL) {
1304 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1307 if ((pid = fork()) < 0) {
1309 my_log(LOG_ERR, "Could not fork!\n");
1312 else if (pid != 0) {
1313 /* i'm the parent, writing down the child pid */
1314 fprintf(pidfile, "%u\n", pid);
1319 /* close the pid file */
1322 /* routines for any daemon process
1323 1. create a new session
1324 2. change directory to the root
1325 3. change the file creation permission
1328 chdir("/var/local/fprobe");
1334 int main(int argc, char **argv)
1337 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1338 int c, i, write_fd, memory_limit = 0;
1339 struct addrinfo hints, *res;
1340 struct sockaddr_in saddr;
1341 pthread_attr_t tattr;
1342 struct sigaction sigact;
1343 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1344 struct timeval timeout;
1346 sched_min = sched_get_priority_min(SCHED);
1347 sched_max = sched_get_priority_max(SCHED);
1349 memset(&saddr, 0 , sizeof(saddr));
1350 memset(&hints, 0 , sizeof(hints));
1351 hints.ai_flags = AI_PASSIVE;
1352 hints.ai_family = AF_INET;
1353 hints.ai_socktype = SOCK_DGRAM;
1355 /* Process command line options */
1358 while ((c = my_getopt(argc, argv, parms)) != -1) {
1368 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1369 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1370 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1371 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1372 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1373 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1374 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1375 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1376 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1377 if (parms[nflag].count) {
1378 switch (atoi(parms[nflag].arg)) {
1380 netflow = &NetFlow1;
1387 netflow = &NetFlow7;
1391 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1395 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1396 if (parms[lflag].count) {
1397 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1400 sprintf(errpbuf, "[%s]", log_suffix);
1401 strcat(ident, errpbuf);
1404 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1405 if (log_suffix) *--log_suffix = ':';
1407 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1409 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1412 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1413 if (parms[qflag].count) {
1414 pending_queue_length = atoi(parms[qflag].arg);
1415 if (pending_queue_length < 1) {
1416 fprintf(stderr, "Illegal %s\n", "pending queue length");
1420 if (parms[rflag].count) {
1421 schedp.sched_priority = atoi(parms[rflag].arg);
1422 if (schedp.sched_priority
1423 && (schedp.sched_priority < sched_min
1424 || schedp.sched_priority > sched_max)) {
1425 fprintf(stderr, "Illegal %s\n", "realtime priority");
1429 if (parms[Bflag].count) {
1430 sockbufsize = atoi(parms[Bflag].arg) << 10;
1432 if (parms[bflag].count) {
1433 bulk_quantity = atoi(parms[bflag].arg);
1434 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1435 fprintf(stderr, "Illegal %s\n", "bulk size");
1439 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1440 if (parms[Xflag].count) {
1441 for(i = 0; parms[Xflag].arg[i]; i++)
1442 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1443 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1445 rule = strtok(parms[Xflag].arg, ":");
1446 for (i = 0; rule; i++) {
1447 snmp_rules[i].len = strlen(rule);
1448 if (snmp_rules[i].len > IFNAMSIZ) {
1449 fprintf(stderr, "Illegal %s\n", "interface basename");
1452 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1453 if (!*(rule - 1)) *(rule - 1) = ',';
1454 rule = strtok(NULL, ",");
1456 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1459 snmp_rules[i].base = atoi(rule);
1461 rule = strtok(NULL, ":");
1465 if (parms[tflag].count)
1466 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1467 if (parms[aflag].count) {
1468 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1470 fprintf(stderr, "Illegal %s\n", "source address");
1473 saddr = *((struct sockaddr_in *) res->ai_addr);
1477 if (parms[uflag].count)
1478 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1479 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1484 /* Process collectors parameters. Brrrr... :-[ */
1486 npeers = argc - optind;
1488 /* Send to remote Netflow collector */
1489 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1490 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1492 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1494 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1495 fprintf(stderr, "socket(): %s\n", strerror(errno));
1498 peers[npeers].write_fd = write_fd;
1499 peers[npeers].type = PEER_MIRROR;
1500 peers[npeers].laddr = saddr;
1501 peers[npeers].seq = 0;
1502 if ((lhost = strchr(dport, '/'))) {
1504 if ((type = strchr(lhost, '/'))) {
1512 peers[npeers].type = PEER_ROTATE;
1521 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1522 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1526 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1527 sizeof(struct sockaddr_in))) {
1528 fprintf(stderr, "bind(): %s\n", strerror(errno));
1531 if (getaddrinfo(dhost, dport, &hints, &res)) {
1533 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1536 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1538 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1539 sizeof(struct sockaddr_in))) {
1540 fprintf(stderr, "connect(): %s\n", strerror(errno));
1544 /* Restore command line */
1545 if (type) *--type = '/';
1546 if (lhost) *--lhost = '/';
1550 else if (parms[fflag].count) {
1552 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1553 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1554 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1556 peers[npeers].write_fd = START_DATA_FD;
1557 peers[npeers].type = PEER_FILE;
1558 peers[npeers].seq = 0;
1567 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1568 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1570 fprintf(stderr, "libipulog initialization error: %s",
1571 ipulog_strerror(ipulog_errno));
1575 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1576 &sockbufsize, sizeof(sockbufsize)) < 0)
1577 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1579 /* Daemonize (if log destination stdout-free) */
1581 my_log_open(ident, verbosity, log_dest);
1585 if (!(log_dest & 2)) {
1586 /* Crash-proofing - Sapan*/
1590 fprintf(stderr, "fork(): %s", strerror(errno));
1595 freopen("/dev/null", "r", stdin);
1596 freopen("/dev/null", "w", stdout);
1597 freopen("/dev/null", "w", stderr);
1601 while (wait3(NULL,0,NULL) < 1);
1605 setvbuf(stdout, (char *)0, _IONBF, 0);
1606 setvbuf(stderr, (char *)0, _IONBF, 0);
1610 sprintf(errpbuf, "[%ld]", (long) pid);
1611 strcat(ident, errpbuf);
1613 /* Initialization */
1615 hash_init(); /* Actually for crc16 only */
1616 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1617 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1620 /* Hope 12 days is enough :-/ */
1621 start_time_offset = 1 << 20;
1623 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1625 gettime(&start_time);
1628 Build static pending queue as circular buffer.
1630 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1631 pending_tail = pending_head;
1632 for (i = pending_queue_length - 1; i--;) {
1633 if (!(pending_tail->next = mem_alloc())) {
1635 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1638 pending_tail = pending_tail->next;
1640 pending_tail->next = pending_head;
1641 pending_tail = pending_head;
1643 sigemptyset(&sig_mask);
1644 sigact.sa_handler = &sighandler;
1645 sigact.sa_mask = sig_mask;
1646 sigact.sa_flags = 0;
1647 sigaddset(&sig_mask, SIGTERM);
1648 sigaction(SIGTERM, &sigact, 0);
1649 #if ((DEBUG) & DEBUG_I)
1650 sigaddset(&sig_mask, SIGUSR1);
1651 sigaction(SIGUSR1, &sigact, 0);
1653 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1654 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1658 my_log(LOG_INFO, "Starting %s...", VERSION);
1660 if (parms[cflag].count) {
1661 if (chdir(parms[cflag].arg) || chroot(".")) {
1662 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1667 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1668 pthread_attr_init(&tattr);
1669 for (i = 0; i < THREADS - 1; i++) {
1670 if (schedp.sched_priority > 0) {
1671 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1672 (pthread_attr_setschedparam(&tattr, &schedp))) {
1673 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1677 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1678 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1681 pthread_detach(thid);
1682 schedp.sched_priority++;
1686 if (setgroups(0, NULL)) {
1687 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1690 if (setregid(pw->pw_gid, pw->pw_gid)) {
1691 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1694 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1695 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1700 if (!(pidfile = fopen(pidfilepath, "w")))
1701 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1703 fprintf(pidfile, "%ld\n", (long) pid);
1707 my_log(LOG_INFO, "pid: %d", pid);
1708 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1709 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1710 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1711 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1712 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1713 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1714 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1715 for (i = 0; i < nsnmp_rules; i++) {
1716 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1717 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1719 for (i = 0; i < npeers; i++) {
1720 switch (peers[i].type) {
1728 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1729 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1730 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1733 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1735 timeout.tv_usec = 0;
1737 || (total_elements - free_elements - pending_queue_length)
1739 || pending_tail->flags) {
1742 timeout.tv_sec = scan_interval;
1743 select(0, 0, 0, 0, &timeout);
1746 if (sigs & SIGTERM_MASK && !killed) {
1747 sigs &= ~SIGTERM_MASK;
1748 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1751 active_lifetime = -1;
1752 inactive_lifetime = -1;
1754 unpending_timeout = 1;
1756 pthread_cond_signal(&scan_cond);
1757 pthread_cond_signal(&unpending_cond);
1760 #if ((DEBUG) & DEBUG_I)
1761 if (sigs & SIGUSR1_MASK) {
1762 sigs &= ~SIGUSR1_MASK;
1767 remove(pidfilepath);
1768 #if ((DEBUG) & DEBUG_I)
1771 my_log(LOG_INFO, "Done.");