2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
42 struct ipulog_handle {
45 struct sockaddr_nl local;
46 struct sockaddr_nl peer;
47 struct nlmsghdr* last_nlhdr;
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
62 #include <sys/param.h>
87 #include <sys/select.h>
93 #include <fprobe-ulog.h>
95 #include <my_getopt.h>
100 #define PIDFILE "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE sizeof("32767")
103 #define STD_NETFLOW_PDU
133 static struct getopt_parms parms[] = {
134 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
162 extern int optind, opterr, optopt;
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
181 #define BULK_QUANTITY 200
184 static unsigned epoch_length=60, log_epochs=1;
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
252 "fprobe-ulog: a NetFlow probe. Version %s\n"
253 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
255 "-h\t\tDisplay this help\n"
256 "-U <mask>\tULOG group bitwise mask [1]\n"
257 "-s <seconds>\tHow often scan for expired flows [5]\n"
258 "-g <seconds>\tFragmented flow lifetime [30]\n"
259 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260 "-f <filename>\tLog flow data in a file\n"
261 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263 "-a <address>\tUse <address> as source for NetFlow flow\n"
264 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265 "-M\t\tUse netfilter mark value as ToS flag\n"
266 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268 "-q <flows>\tPending queue length [100]\n"
269 "-B <kilobytes>\tKernel capture buffer size [0]\n"
270 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272 "-c <directory>\tDirectory to chroot to\n"
273 "-u <user>\tUser to run as\n"
274 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276 "-y <remote:port>\tAddress of the NetFlow collector\n"
277 "-f <writable file>\tFile to write data into\n"
278 "-T <n>\tRotate log file every n epochs\n"
279 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280 "-E <[1..60]>\tSize of an epoch in minutes\n"
281 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
283 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
287 #if ((DEBUG) & DEBUG_I)
290 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291 pkts_total, pkts_total_fragmented, size_total,
292 pkts_pending - pkts_pending_done, pending_queue_trace);
293 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298 total_elements, free_elements, total_memory);
302 void sighandler(int sig)
306 sigs |= SIGTERM_MASK;
308 #if ((DEBUG) & DEBUG_I)
310 sigs |= SIGUSR1_MASK;
316 void gettime(struct Time *now)
322 now->usec = t.tv_usec;
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
328 return (t1->sec - t2->sec)/60;
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
333 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
339 /* Maximum uptime is about 49/2 days */
340 return cmpmtime(t, &start_time);
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
346 /* Maximum uptime is about 49/2 days */
347 return cmpMtime(t, &start_time);
350 hash_t hash_flow(struct Flow *flow)
352 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353 else return hash(flow, sizeof(struct Flow_TL));
356 uint16_t snmp_index(char *name) {
359 if (!*name) return 0;
361 for (i = 0; (int) i < nsnmp_rules; i++) {
362 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
366 if ((i = if_nametoindex(name))) return i;
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
379 dst->proto = src->proto;
380 dst->tcp_flags = src->tcp_flags;
384 dst->pkts = src->pkts;
385 dst->size = src->size;
386 dst->sizeF = src->sizeF;
387 dst->sizeP = src->sizeP;
388 dst->ctime = src->ctime;
389 dst->mtime = src->mtime;
390 dst->flags = src->flags;
393 void read_cur_epoch() {
395 /* Reset to -1 in case the read fails */
397 fd = open(LAST_EPOCH_FILE, O_RDONLY);
399 char snum[MAX_EPOCH_SIZE];
401 len = read(fd, snum, MAX_EPOCH_SIZE-1);
404 sscanf(snum,"%d",&cur_epoch);
405 cur_epoch++; /* Let's not stone the last epoch */
413 /* Dumps the current epoch in a file to cope with
414 * reboots and killings of fprobe */
416 void update_cur_epoch_file(int n) {
418 char snum[MAX_EPOCH_SIZE];
419 len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420 fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
422 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
425 write(fd, snum, len);
429 /* Get the file descriptor corresponding to the current file.
430 * The kludgy implementation is to abstract away the 'current
431 * file descriptor', which may also be a socket.
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
438 struct statfs statfs;
441 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442 * doesn't solve the problem */
444 cur_uptime = getuptime_minutes(&now);
446 if (cur_fd != START_DATA_FD) {
447 if (fstatfs(cur_fd, &statfs) == -1) {
448 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
451 if (min_free && (statfs.f_bavail < min_free)
452 && (cur_epoch==last_peak))
454 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
459 assume that we can reclaim space by overwriting our own files
460 and that the difference in size will not fill the disk - sapan
465 /* If epoch length has been exceeded,
466 * or we're starting up
467 * or we're going back to the first epoch */
468 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469 char nextname[MAX_PATH_LEN];
471 prev_uptime = cur_uptime;
472 cur_epoch = (cur_epoch + 1) % log_epochs;
473 if (cur_epoch>last_peak) last_peak = cur_epoch;
476 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477 if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
478 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
481 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
482 my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno));
484 update_cur_epoch_file(cur_epoch);
493 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
495 struct Flow **flowpp;
501 if (prev) flowpp = *prev;
504 if (where->sip.s_addr == what->sip.s_addr
505 && where->dip.s_addr == what->dip.s_addr
506 && where->proto == what->proto) {
507 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
509 /* Both unfragmented */
510 if ((what->sp == where->sp)
511 && (what->dp == where->dp)) goto done;
514 /* Both fragmented */
515 if (where->id == what->id) goto done;
519 flowpp = &where->next;
523 if (prev) *prev = flowpp;
527 int put_into(struct Flow *flow, int flag
528 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
535 struct Flow *flown, **flowpp;
536 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
545 pthread_mutex_lock(&flows_mutex[h]);
547 if (!(flown = find(flows[h], flow, &flowpp))) {
548 /* No suitable flow found - add */
549 if (flag == COPY_INTO) {
550 if ((flown = mem_alloc())) {
551 copy_flow(flow, flown);
554 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
555 my_log(LOG_ERR, "%s %s. %s",
556 "mem_alloc():", strerror(errno), "packet lost");
561 flow->next = flows[h];
563 #if ((DEBUG) & DEBUG_I)
565 if (flow->flags & FLOW_FRAG) flows_fragmented++;
567 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
569 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
574 /* Found suitable flow - update */
575 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
576 sprintf(buf, " +> %x", (unsigned) flown);
579 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
580 flown->mtime = flow->mtime;
581 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
582 flown->ctime = flow->ctime;
583 flown->tcp_flags |= flow->tcp_flags;
584 flown->size += flow->size;
585 flown->pkts += flow->pkts;
586 if (flow->flags & FLOW_FRAG) {
587 /* Fragmented flow require some additional work */
588 if (flow->flags & FLOW_TL) {
591 Several packets with FLOW_TL (attack)
593 flown->sp = flow->sp;
594 flown->dp = flow->dp;
596 if (flow->flags & FLOW_LASTFRAG) {
599 Several packets with FLOW_LASTFRAG (attack)
601 flown->sizeP = flow->sizeP;
603 flown->flags |= flow->flags;
604 flown->sizeF += flow->sizeF;
605 if ((flown->flags & FLOW_LASTFRAG)
606 && (flown->sizeF >= flown->sizeP)) {
607 /* All fragments received - flow reassembled */
608 *flowpp = flown->next;
609 pthread_mutex_unlock(&flows_mutex[h]);
610 #if ((DEBUG) & DEBUG_I)
615 flown->flags &= ~FLOW_FRAG;
616 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
619 ret = put_into(flown, MOVE_INTO
620 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
626 if (flag == MOVE_INTO) mem_free(flow);
628 pthread_mutex_unlock(&flows_mutex[h]);
634 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
638 for (i = 0; i < fields; i++) {
639 #if ((DEBUG) & DEBUG_F)
640 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
643 case NETFLOW_IPV4_SRC_ADDR:
644 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
645 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
648 case NETFLOW_IPV4_DST_ADDR:
649 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
650 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
651 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
653 p += NETFLOW_IPV4_DST_ADDR_SIZE;
656 case NETFLOW_INPUT_SNMP:
657 *((uint16_t *) p) = htons(flow->iif);
658 p += NETFLOW_INPUT_SNMP_SIZE;
661 case NETFLOW_OUTPUT_SNMP:
662 *((uint16_t *) p) = htons(flow->oif);
663 p += NETFLOW_OUTPUT_SNMP_SIZE;
666 case NETFLOW_PKTS_32:
667 *((uint32_t *) p) = htonl(flow->pkts);
668 p += NETFLOW_PKTS_32_SIZE;
671 case NETFLOW_BYTES_32:
672 *((uint32_t *) p) = htonl(flow->size);
673 p += NETFLOW_BYTES_32_SIZE;
676 case NETFLOW_FIRST_SWITCHED:
677 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
678 p += NETFLOW_FIRST_SWITCHED_SIZE;
681 case NETFLOW_LAST_SWITCHED:
682 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
683 p += NETFLOW_LAST_SWITCHED_SIZE;
686 case NETFLOW_L4_SRC_PORT:
687 *((uint16_t *) p) = flow->sp;
688 p += NETFLOW_L4_SRC_PORT_SIZE;
691 case NETFLOW_L4_DST_PORT:
692 *((uint16_t *) p) = flow->dp;
693 p += NETFLOW_L4_DST_PORT_SIZE;
697 *((uint8_t *) p) = flow->proto;
698 p += NETFLOW_PROT_SIZE;
701 case NETFLOW_SRC_TOS:
702 *((uint8_t *) p) = flow->tos;
703 p += NETFLOW_SRC_TOS_SIZE;
706 case NETFLOW_TCP_FLAGS:
707 *((uint8_t *) p) = flow->tcp_flags;
708 p += NETFLOW_TCP_FLAGS_SIZE;
711 case NETFLOW_VERSION:
712 *((uint16_t *) p) = htons(netflow->Version);
713 p += NETFLOW_VERSION_SIZE;
717 *((uint16_t *) p) = htons(emit_count);
718 p += NETFLOW_COUNT_SIZE;
722 *((uint32_t *) p) = htonl(getuptime(&emit_time));
723 p += NETFLOW_UPTIME_SIZE;
726 case NETFLOW_UNIX_SECS:
727 *((uint32_t *) p) = htonl(emit_time.sec);
728 p += NETFLOW_UNIX_SECS_SIZE;
731 case NETFLOW_UNIX_NSECS:
732 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
733 p += NETFLOW_UNIX_NSECS_SIZE;
736 case NETFLOW_FLOW_SEQUENCE:
737 //*((uint32_t *) p) = htonl(emit_sequence);
738 *((uint32_t *) p) = 0;
739 p += NETFLOW_FLOW_SEQUENCE_SIZE;
743 /* Unsupported (uint8_t) */
744 case NETFLOW_ENGINE_TYPE:
745 case NETFLOW_ENGINE_ID:
746 case NETFLOW_FLAGS7_1:
747 case NETFLOW_SRC_MASK:
748 case NETFLOW_DST_MASK:
750 my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
753 *((uint8_t *) p) = 0;
754 p += NETFLOW_PAD8_SIZE;
757 *((uint32_t *) p) = flow->xid;
758 p += NETFLOW_XID_SIZE;
761 /* Unsupported (uint16_t) */
764 case NETFLOW_FLAGS7_2:
765 *((uint16_t *) p) = 0;
766 p += NETFLOW_PAD16_SIZE;
770 /* Unsupported (uint32_t) */
771 case NETFLOW_IPV4_NEXT_HOP:
772 case NETFLOW_ROUTER_SC:
773 *((uint32_t *) p) = 0;
774 p += NETFLOW_PAD32_SIZE;
778 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
779 format, i, format[i]);
783 #if ((DEBUG) & DEBUG_F)
784 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
791 Workaround for clone()-based threads
792 Try to change EUID independently of main thread
796 setregid(pw->pw_gid, pw->pw_gid);
797 setreuid(pw->pw_uid, pw->pw_uid);
806 struct timespec timeout;
807 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
809 p = (void *) &emit_packet + netflow->HeaderSize;
815 pthread_mutex_lock(&emit_mutex);
816 while (!flows_emit) {
817 gettimeofday(&now, 0);
818 timeout.tv_sec = now.tv_sec + emit_timeout;
819 /* Do not wait until emit_packet will filled - it may be too long */
820 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
821 pthread_mutex_unlock(&emit_mutex);
826 flows_emit = flows_emit->next;
827 #if ((DEBUG) & DEBUG_I)
830 pthread_mutex_unlock(&emit_mutex);
834 gettime(&start_time);
835 start_time.sec -= start_time_offset;
838 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
842 printf("Emit count = %d\n", emit_count);
845 if (emit_count == netflow->MaxFlows) {
848 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
849 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
850 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
851 #ifdef STD_NETFLOW_PDU
852 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
855 for (i = 0; i < npeers; i++) {
856 if (peers[i].type == PEER_FILE) {
857 if (netflow->SeqOffset)
858 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
859 peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
860 ret = write(peers[i].write_fd, emit_packet, size);
863 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
864 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
865 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
869 #if ((DEBUG) & DEBUG_E)
871 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
872 emit_count, i + 1, peers[i].seq);
875 peers[i].seq += emit_count;
878 if (emit_rate_bytes) {
880 delay = sent / emit_rate_bytes;
882 sent %= emit_rate_bytes;
884 timeout.tv_nsec = emit_rate_delay * delay;
885 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
890 if (peers[i].type == PEER_MIRROR) goto sendreal;
892 if (peers[i].type == PEER_ROTATE)
893 if (peer_rot_cur++ == peer_rot_work) {
895 if (netflow->SeqOffset)
896 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
897 ret = send(peers[i].write_fd, emit_packet, size, 0);
899 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
900 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
901 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
904 #if ((DEBUG) & DEBUG_E)
906 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
907 emit_count, i + 1, peers[i].seq);
910 peers[i].seq += emit_count;
913 if (emit_rate_bytes) {
915 delay = sent / emit_rate_bytes;
917 sent %= emit_rate_bytes;
919 timeout.tv_nsec = emit_rate_delay * delay;
920 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
925 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
926 emit_sequence += emit_count;
928 #if ((DEBUG) & DEBUG_I)
935 void *unpending_thread()
938 struct timespec timeout;
939 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
946 pthread_mutex_lock(&unpending_mutex);
949 while (!(pending_tail->flags & FLOW_PENDING)) {
950 gettimeofday(&now, 0);
951 timeout.tv_sec = now.tv_sec + unpending_timeout;
952 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
955 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
958 if (put_into(pending_tail, COPY_INTO
959 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
963 #if ((DEBUG) & DEBUG_I)
964 pkts_lost_unpending++;
968 #if ((DEBUG) & DEBUG_U)
969 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
972 pending_tail->flags = 0;
973 pending_tail = pending_tail->next;
974 #if ((DEBUG) & DEBUG_I)
982 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
986 struct Flow *flow, **flowpp;
988 struct timespec timeout;
993 pthread_mutex_lock(&scan_mutex);
997 timeout.tv_sec = now.sec + scan_interval;
998 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1001 #if ((DEBUG) & DEBUG_S)
1002 my_log(LOG_DEBUG, "S: %d", now.sec);
1004 for (i = 0; i < 1 << HASH_BITS ; i++) {
1005 pthread_mutex_lock(&flows_mutex[i]);
1009 if (flow->flags & FLOW_FRAG) {
1010 /* Process fragmented flow */
1011 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1012 /* Fragmented flow expired - put it into special chain */
1013 #if ((DEBUG) & DEBUG_I)
1017 *flowpp = flow->next;
1019 flow->flags &= ~FLOW_FRAG;
1020 flow->next = scan_frag_dreg;
1021 scan_frag_dreg = flow;
1026 /* Flow is not frgamented */
1027 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1028 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1030 #if ((DEBUG) & DEBUG_S)
1031 my_log(LOG_DEBUG, "S: E %x", flow);
1033 #if ((DEBUG) & DEBUG_I)
1036 *flowpp = flow->next;
1037 pthread_mutex_lock(&emit_mutex);
1038 flow->next = flows_emit;
1040 #if ((DEBUG) & DEBUG_I)
1043 pthread_mutex_unlock(&emit_mutex);
1048 flowpp = &flow->next;
1051 pthread_mutex_unlock(&flows_mutex[i]);
1053 if (flows_emit) pthread_cond_signal(&emit_cond);
1055 while (scan_frag_dreg) {
1056 flow = scan_frag_dreg;
1057 scan_frag_dreg = flow->next;
1058 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1061 put_into(flow, MOVE_INTO
1062 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1066 #if ((DEBUG) & DEBUG_S)
1067 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1075 struct ulog_packet_msg *ulog_msg;
1079 int len, off_frag, psize;
1080 #if ((DEBUG) & DEBUG_C)
1089 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1091 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1094 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1096 #if ((DEBUG) & DEBUG_C)
1097 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1100 nl = (void *) &ulog_msg->payload;
1101 psize = ulog_msg->data_len;
1104 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1105 #if ((DEBUG) & DEBUG_C)
1106 strcat(logbuf, " U");
1107 my_log(LOG_DEBUG, "%s", logbuf);
1109 #if ((DEBUG) & DEBUG_I)
1115 if (pending_head->flags) {
1116 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1118 # if ((DEBUG) & DEBUG_C)
1123 "pending queue full:", "packet lost");
1125 #if ((DEBUG) & DEBUG_I)
1126 pkts_lost_capture++;
1131 #if ((DEBUG) & DEBUG_I)
1135 flow = pending_head;
1137 /* ?FIXME? Add sanity check for ip_len? */
1138 flow->size = ntohs(nl->ip_len);
1139 #if ((DEBUG) & DEBUG_I)
1140 size_total += flow->size;
1143 flow->sip = nl->ip_src;
1144 flow->dip = nl->ip_dst;
1145 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1147 /* It's going to be expensive calling this syscall on every flow.
1148 * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1149 if (ulog_msg->mark > 0) {
1150 flow->xid = get_vhi_name(ulog_msg->mark);
1151 challenge = get_vhi_name(ulog_msg->mark);
1154 if (flow->xid < 1 || flow->xid!=challenge)
1155 flow->xid = ulog_msg->mark;
1158 if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1159 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid);
1161 flow->iif = snmp_index(ulog_msg->indev_name);
1162 flow->oif = snmp_index(ulog_msg->outdev_name);
1163 flow->proto = nl->ip_p;
1165 flow->tcp_flags = 0;
1169 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1170 if (ulog_msg->timestamp_sec) {
1171 flow->ctime.sec = ulog_msg->timestamp_sec;
1172 flow->ctime.usec = ulog_msg->timestamp_usec;
1173 } else gettime(&flow->ctime);
1174 flow->mtime = flow->ctime;
1176 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1179 Offset (from network layer) to transport layer header/IP data
1180 IOW IP header size ;-)
1183 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1185 off_tl = nl->ip_hl << 2;
1186 tl = (void *) nl + off_tl;
1188 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1189 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1191 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1192 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1194 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1195 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1196 #if ((DEBUG) & DEBUG_C)
1197 strcat(logbuf, " F");
1199 #if ((DEBUG) & DEBUG_I)
1200 pkts_total_fragmented++;
1202 flow->flags |= FLOW_FRAG;
1203 flow->id = nl->ip_id;
1205 if (!(ntohs(nl->ip_off) & IP_MF)) {
1206 /* Packet whith IP_MF contains information about whole datagram size */
1207 flow->flags |= FLOW_LASTFRAG;
1208 /* size = frag_offset*8 + data_size */
1209 flow->sizeP = off_frag + flow->sizeF;
1213 #if ((DEBUG) & DEBUG_C)
1214 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1215 strcat(logbuf, buf);
1216 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1217 strcat(logbuf, buf);
1221 Fortunately most interesting transport layer information fit
1222 into first 8 bytes of IP data field (minimal nonzero size).
1223 Thus we don't need actual packet reassembling to build whole
1224 transport layer data. We only check the fragment offset for
1225 zero value to find packet with this information.
1227 if (!off_frag && psize >= 8) {
1228 switch (flow->proto) {
1231 flow->sp = ((struct udphdr *)tl)->uh_sport;
1232 flow->dp = ((struct udphdr *)tl)->uh_dport;
1237 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1238 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1241 #ifdef ICMP_TRICK_CISCO
1243 flow->dp = *((int32_t *) tl);
1248 /* Unknown transport layer */
1249 #if ((DEBUG) & DEBUG_C)
1250 strcat(logbuf, " U");
1257 #if ((DEBUG) & DEBUG_C)
1258 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1259 strcat(logbuf, buf);
1261 flow->flags |= FLOW_TL;
1265 /* Check for tcp flags presence (including CWR and ECE). */
1266 if (flow->proto == IPPROTO_TCP
1268 && psize >= 16 - off_frag) {
1269 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1270 #if ((DEBUG) & DEBUG_C)
1271 sprintf(buf, " TCP:%x", flow->tcp_flags);
1272 strcat(logbuf, buf);
1276 #if ((DEBUG) & DEBUG_C)
1277 sprintf(buf, " => %x", (unsigned) flow);
1278 strcat(logbuf, buf);
1279 my_log(LOG_DEBUG, "%s", logbuf);
1282 #if ((DEBUG) & DEBUG_I)
1284 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1285 if (pending_queue_trace < pending_queue_trace_candidate)
1286 pending_queue_trace = pending_queue_trace_candidate;
1289 /* Flow complete - inform unpending_thread() about it */
1290 pending_head->flags |= FLOW_PENDING;
1291 pending_head = pending_head->next;
1293 pthread_cond_signal(&unpending_cond);
1299 /* Copied out of CoDemux */
1301 static int init_daemon() {
1305 pidfile = fopen(PIDFILE, "w");
1306 if (pidfile == NULL) {
1307 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1310 if ((pid = fork()) < 0) {
1312 my_log(LOG_ERR, "Could not fork!\n");
1315 else if (pid != 0) {
1316 /* i'm the parent, writing down the child pid */
1317 fprintf(pidfile, "%u\n", pid);
1322 /* close the pid file */
1325 /* routines for any daemon process
1326 1. create a new session
1327 2. change directory to the root
1328 3. change the file creation permission
1331 chdir("/var/local/fprobe");
1337 int main(int argc, char **argv)
1340 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1341 int c, i, write_fd, memory_limit = 0;
1342 struct addrinfo hints, *res;
1343 struct sockaddr_in saddr;
1344 pthread_attr_t tattr;
1345 struct sigaction sigact;
1346 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1347 struct timeval timeout;
1349 sched_min = sched_get_priority_min(SCHED);
1350 sched_max = sched_get_priority_max(SCHED);
1352 memset(&saddr, 0 , sizeof(saddr));
1353 memset(&hints, 0 , sizeof(hints));
1354 hints.ai_flags = AI_PASSIVE;
1355 hints.ai_family = AF_INET;
1356 hints.ai_socktype = SOCK_DGRAM;
1358 /* Process command line options */
1361 while ((c = my_getopt(argc, argv, parms)) != -1) {
1371 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1372 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1373 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1374 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1375 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1376 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1377 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1378 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1379 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1380 if (parms[nflag].count) {
1381 switch (atoi(parms[nflag].arg)) {
1383 netflow = &NetFlow1;
1390 netflow = &NetFlow7;
1394 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1398 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1399 if (parms[lflag].count) {
1400 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1403 sprintf(errpbuf, "[%s]", log_suffix);
1404 strcat(ident, errpbuf);
1407 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1408 if (log_suffix) *--log_suffix = ':';
1410 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1412 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1415 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1416 if (parms[qflag].count) {
1417 pending_queue_length = atoi(parms[qflag].arg);
1418 if (pending_queue_length < 1) {
1419 fprintf(stderr, "Illegal %s\n", "pending queue length");
1423 if (parms[rflag].count) {
1424 schedp.sched_priority = atoi(parms[rflag].arg);
1425 if (schedp.sched_priority
1426 && (schedp.sched_priority < sched_min
1427 || schedp.sched_priority > sched_max)) {
1428 fprintf(stderr, "Illegal %s\n", "realtime priority");
1432 if (parms[Bflag].count) {
1433 sockbufsize = atoi(parms[Bflag].arg) << 10;
1435 if (parms[bflag].count) {
1436 bulk_quantity = atoi(parms[bflag].arg);
1437 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1438 fprintf(stderr, "Illegal %s\n", "bulk size");
1442 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1443 if (parms[Xflag].count) {
1444 for(i = 0; parms[Xflag].arg[i]; i++)
1445 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1446 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1448 rule = strtok(parms[Xflag].arg, ":");
1449 for (i = 0; rule; i++) {
1450 snmp_rules[i].len = strlen(rule);
1451 if (snmp_rules[i].len > IFNAMSIZ) {
1452 fprintf(stderr, "Illegal %s\n", "interface basename");
1455 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1456 if (!*(rule - 1)) *(rule - 1) = ',';
1457 rule = strtok(NULL, ",");
1459 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1462 snmp_rules[i].base = atoi(rule);
1464 rule = strtok(NULL, ":");
1468 if (parms[tflag].count)
1469 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1470 if (parms[aflag].count) {
1471 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1473 fprintf(stderr, "Illegal %s\n", "source address");
1476 saddr = *((struct sockaddr_in *) res->ai_addr);
1480 if (parms[uflag].count)
1481 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1482 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1487 /* Process collectors parameters. Brrrr... :-[ */
1489 npeers = argc - optind;
1491 /* Send to remote Netflow collector */
1492 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1493 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1495 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1497 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1498 fprintf(stderr, "socket(): %s\n", strerror(errno));
1501 peers[npeers].write_fd = write_fd;
1502 peers[npeers].type = PEER_MIRROR;
1503 peers[npeers].laddr = saddr;
1504 peers[npeers].seq = 0;
1505 if ((lhost = strchr(dport, '/'))) {
1507 if ((type = strchr(lhost, '/'))) {
1515 peers[npeers].type = PEER_ROTATE;
1524 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1525 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1529 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1530 sizeof(struct sockaddr_in))) {
1531 fprintf(stderr, "bind(): %s\n", strerror(errno));
1534 if (getaddrinfo(dhost, dport, &hints, &res)) {
1536 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1539 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1541 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1542 sizeof(struct sockaddr_in))) {
1543 fprintf(stderr, "connect(): %s\n", strerror(errno));
1547 /* Restore command line */
1548 if (type) *--type = '/';
1549 if (lhost) *--lhost = '/';
1553 else if (parms[fflag].count) {
1555 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1556 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1557 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1559 peers[npeers].write_fd = START_DATA_FD;
1560 peers[npeers].type = PEER_FILE;
1561 peers[npeers].seq = 0;
1570 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1571 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1573 fprintf(stderr, "libipulog initialization error: %s",
1574 ipulog_strerror(ipulog_errno));
1578 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1579 &sockbufsize, sizeof(sockbufsize)) < 0)
1580 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1582 /* Daemonize (if log destination stdout-free) */
1584 my_log_open(ident, verbosity, log_dest);
1588 if (!(log_dest & 2)) {
1589 /* Crash-proofing - Sapan*/
1593 fprintf(stderr, "fork(): %s", strerror(errno));
1598 freopen("/dev/null", "r", stdin);
1599 freopen("/dev/null", "w", stdout);
1600 freopen("/dev/null", "w", stderr);
1604 while (wait3(NULL,0,NULL) < 1);
1608 setvbuf(stdout, (char *)0, _IONBF, 0);
1609 setvbuf(stderr, (char *)0, _IONBF, 0);
1613 sprintf(errpbuf, "[%ld]", (long) pid);
1614 strcat(ident, errpbuf);
1616 /* Initialization */
1618 hash_init(); /* Actually for crc16 only */
1619 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1620 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1623 /* Hope 12 days is enough :-/ */
1624 start_time_offset = 1 << 20;
1626 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1628 gettime(&start_time);
1631 Build static pending queue as circular buffer.
1633 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1634 pending_tail = pending_head;
1635 for (i = pending_queue_length - 1; i--;) {
1636 if (!(pending_tail->next = mem_alloc())) {
1638 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1641 pending_tail = pending_tail->next;
1643 pending_tail->next = pending_head;
1644 pending_tail = pending_head;
1646 sigemptyset(&sig_mask);
1647 sigact.sa_handler = &sighandler;
1648 sigact.sa_mask = sig_mask;
1649 sigact.sa_flags = 0;
1650 sigaddset(&sig_mask, SIGTERM);
1651 sigaction(SIGTERM, &sigact, 0);
1652 #if ((DEBUG) & DEBUG_I)
1653 sigaddset(&sig_mask, SIGUSR1);
1654 sigaction(SIGUSR1, &sigact, 0);
1656 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1657 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1661 my_log(LOG_INFO, "Starting %s...", VERSION);
1663 if (parms[cflag].count) {
1664 if (chdir(parms[cflag].arg) || chroot(".")) {
1665 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1670 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1671 pthread_attr_init(&tattr);
1672 for (i = 0; i < THREADS - 1; i++) {
1673 if (schedp.sched_priority > 0) {
1674 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1675 (pthread_attr_setschedparam(&tattr, &schedp))) {
1676 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1680 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1681 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1684 pthread_detach(thid);
1685 schedp.sched_priority++;
1689 if (setgroups(0, NULL)) {
1690 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1693 if (setregid(pw->pw_gid, pw->pw_gid)) {
1694 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1697 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1698 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1703 if (!(pidfile = fopen(pidfilepath, "w")))
1704 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1706 fprintf(pidfile, "%ld\n", (long) pid);
1710 my_log(LOG_INFO, "pid: %d", pid);
1711 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1712 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1713 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1714 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1715 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1716 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1717 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1718 for (i = 0; i < nsnmp_rules; i++) {
1719 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1720 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1722 for (i = 0; i < npeers; i++) {
1723 switch (peers[i].type) {
1731 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1732 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1733 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1736 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1738 timeout.tv_usec = 0;
1740 || (total_elements - free_elements - pending_queue_length)
1742 || pending_tail->flags) {
1745 timeout.tv_sec = scan_interval;
1746 select(0, 0, 0, 0, &timeout);
1749 if (sigs & SIGTERM_MASK && !killed) {
1750 sigs &= ~SIGTERM_MASK;
1751 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1754 active_lifetime = -1;
1755 inactive_lifetime = -1;
1757 unpending_timeout = 1;
1759 pthread_cond_signal(&scan_cond);
1760 pthread_cond_signal(&unpending_cond);
1763 #if ((DEBUG) & DEBUG_I)
1764 if (sigs & SIGUSR1_MASK) {
1765 sigs &= ~SIGUSR1_MASK;
1770 remove(pidfilepath);
1771 #if ((DEBUG) & DEBUG_I)
1774 my_log(LOG_INFO, "Done.");