2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
42 struct ipulog_handle {
45 struct sockaddr_nl local;
46 struct sockaddr_nl peer;
47 struct nlmsghdr* last_nlhdr;
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
62 #include <sys/param.h>
87 #include <sys/select.h>
93 #include <fprobe-ulog.h>
95 #include <my_getopt.h>
100 #define PIDFILE "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE sizeof("32767")
103 #define STD_NETFLOW_PDU
133 static struct getopt_parms parms[] = {
134 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
162 extern int optind, opterr, optopt;
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
181 #define BULK_QUANTITY 200
184 static unsigned epoch_length=60, log_epochs=1;
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
252 "fprobe-ulog: a NetFlow probe. Version %s\n"
253 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
255 "-h\t\tDisplay this help\n"
256 "-U <mask>\tULOG group bitwise mask [1]\n"
257 "-s <seconds>\tHow often scan for expired flows [5]\n"
258 "-g <seconds>\tFragmented flow lifetime [30]\n"
259 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260 "-f <filename>\tLog flow data in a file\n"
261 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263 "-a <address>\tUse <address> as source for NetFlow flow\n"
264 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265 "-M\t\tUse netfilter mark value as ToS flag\n"
266 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268 "-q <flows>\tPending queue length [100]\n"
269 "-B <kilobytes>\tKernel capture buffer size [0]\n"
270 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272 "-c <directory>\tDirectory to chroot to\n"
273 "-u <user>\tUser to run as\n"
274 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276 "-y <remote:port>\tAddress of the NetFlow collector\n"
277 "-f <writable file>\tFile to write data into\n"
278 "-T <n>\tRotate log file every n epochs\n"
279 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280 "-E <[1..60]>\tSize of an epoch in minutes\n"
281 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
283 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
287 #if ((DEBUG) & DEBUG_I)
290 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291 pkts_total, pkts_total_fragmented, size_total,
292 pkts_pending - pkts_pending_done, pending_queue_trace);
293 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298 total_elements, free_elements, total_memory);
302 void sighandler(int sig)
306 sigs |= SIGTERM_MASK;
308 #if ((DEBUG) & DEBUG_I)
310 sigs |= SIGUSR1_MASK;
316 void gettime(struct Time *now)
322 now->usec = t.tv_usec;
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
328 return (t1->sec - t2->sec)/60;
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
333 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
339 /* Maximum uptime is about 49/2 days */
340 return cmpmtime(t, &start_time);
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
346 /* Maximum uptime is about 49/2 days */
347 return cmpMtime(t, &start_time);
350 hash_t hash_flow(struct Flow *flow)
352 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353 else return hash(flow, sizeof(struct Flow_TL));
356 uint16_t snmp_index(char *name) {
359 if (!*name) return 0;
361 for (i = 0; (int) i < nsnmp_rules; i++) {
362 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
366 if ((i = if_nametoindex(name))) return i;
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
378 dst->slice_id = src->slice_id;
379 dst->proto = src->proto;
380 dst->tcp_flags = src->tcp_flags;
384 dst->pkts = src->pkts;
385 dst->size = src->size;
386 dst->sizeF = src->sizeF;
387 dst->sizeP = src->sizeP;
388 dst->ctime = src->ctime;
389 dst->mtime = src->mtime;
390 dst->flags = src->flags;
393 void read_cur_epoch() {
395 /* Reset to -1 in case the read fails */
397 fd = open(LAST_EPOCH_FILE, O_RDONLY);
399 char snum[MAX_EPOCH_SIZE];
401 len = read(fd, snum, MAX_EPOCH_SIZE-1);
404 sscanf(snum,"%d",&cur_epoch);
405 cur_epoch++; /* Let's not stone the last epoch */
413 /* Dumps the current epoch in a file to cope with
414 * reboots and killings of fprobe */
416 void update_cur_epoch_file(int n) {
418 char snum[MAX_EPOCH_SIZE];
419 len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420 fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
422 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
425 write(fd, snum, len);
429 /* Get the file descriptor corresponding to the current file.
430 * The kludgy implementation is to abstract away the 'current
431 * file descriptor', which may also be a socket.
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
438 struct statfs statfs;
441 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442 * doesn't solve the problem */
444 cur_uptime = getuptime_minutes(&now);
446 if (cur_fd != START_DATA_FD) {
447 if (fstatfs(cur_fd, &statfs) == -1) {
448 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
451 if (min_free && (statfs.f_bavail < min_free)
452 && (cur_epoch==last_peak))
454 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
459 assume that we can reclaim space by overwriting our own files
460 and that the difference in size will not fill the disk - sapan
465 /* If epoch length has been exceeded,
466 * or we're starting up
467 * or we're going back to the first epoch */
468 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469 char nextname[MAX_PATH_LEN];
471 prev_uptime = cur_uptime;
472 cur_epoch = (cur_epoch + 1) % log_epochs;
473 if (cur_epoch>last_peak) last_peak = cur_epoch;
476 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477 if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
478 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
481 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
482 my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno));
484 update_cur_epoch_file(cur_epoch);
493 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
495 struct Flow **flowpp;
501 if (prev) flowpp = *prev;
504 if (where->sip.s_addr == what->sip.s_addr
505 && where->dip.s_addr == what->dip.s_addr
506 && where->proto == what->proto) {
507 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
509 /* Both unfragmented */
510 if ((what->sp == where->sp)
511 && (what->dp == where->dp)) goto done;
514 /* Both fragmented */
515 if (where->id == what->id) goto done;
519 flowpp = &where->next;
523 if (prev) *prev = flowpp;
527 int put_into(struct Flow *flow, int flag
528 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
535 struct Flow *flown, **flowpp;
536 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
545 pthread_mutex_lock(&flows_mutex[h]);
547 if (!(flown = find(flows[h], flow, &flowpp))) {
548 /* No suitable flow found - add */
549 if (flag == COPY_INTO) {
550 if ((flown = mem_alloc())) {
551 copy_flow(flow, flown);
554 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
555 my_log(LOG_ERR, "%s %s. %s",
556 "mem_alloc():", strerror(errno), "packet lost");
561 flow->next = flows[h];
563 #if ((DEBUG) & DEBUG_I)
565 if (flow->flags & FLOW_FRAG) flows_fragmented++;
567 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
569 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
574 /* Found suitable flow - update */
575 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
576 sprintf(buf, " +> %x", (unsigned) flown);
579 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
580 flown->mtime = flow->mtime;
581 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
582 flown->ctime = flow->ctime;
583 flown->tcp_flags |= flow->tcp_flags;
584 flown->size += flow->size;
585 flown->pkts += flow->pkts;
587 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
588 * if a better value comes along. A good example of this is that by the time CoDemux sets the
589 * peercred of a flow, it has already been accounted for here and attributed to root. */
591 if (flown->slice_id<1)
592 flown->slice_id = flow->slice_id;
595 if (flow->flags & FLOW_FRAG) {
596 /* Fragmented flow require some additional work */
597 if (flow->flags & FLOW_TL) {
600 Several packets with FLOW_TL (attack)
602 flown->sp = flow->sp;
603 flown->dp = flow->dp;
605 if (flow->flags & FLOW_LASTFRAG) {
608 Several packets with FLOW_LASTFRAG (attack)
610 flown->sizeP = flow->sizeP;
612 flown->flags |= flow->flags;
613 flown->sizeF += flow->sizeF;
614 if ((flown->flags & FLOW_LASTFRAG)
615 && (flown->sizeF >= flown->sizeP)) {
616 /* All fragments received - flow reassembled */
617 *flowpp = flown->next;
618 pthread_mutex_unlock(&flows_mutex[h]);
619 #if ((DEBUG) & DEBUG_I)
624 flown->flags &= ~FLOW_FRAG;
625 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
628 ret = put_into(flown, MOVE_INTO
629 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
635 if (flag == MOVE_INTO) mem_free(flow);
637 pthread_mutex_unlock(&flows_mutex[h]);
643 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
647 for (i = 0; i < fields; i++) {
648 #if ((DEBUG) & DEBUG_F)
649 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
652 case NETFLOW_IPV4_SRC_ADDR:
653 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
654 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
657 case NETFLOW_IPV4_DST_ADDR:
658 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
659 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
660 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
662 p += NETFLOW_IPV4_DST_ADDR_SIZE;
665 case NETFLOW_INPUT_SNMP:
666 *((uint16_t *) p) = htons(flow->iif);
667 p += NETFLOW_INPUT_SNMP_SIZE;
670 case NETFLOW_OUTPUT_SNMP:
671 *((uint16_t *) p) = htons(flow->oif);
672 p += NETFLOW_OUTPUT_SNMP_SIZE;
675 case NETFLOW_PKTS_32:
676 *((uint32_t *) p) = htonl(flow->pkts);
677 p += NETFLOW_PKTS_32_SIZE;
680 case NETFLOW_BYTES_32:
681 *((uint32_t *) p) = htonl(flow->size);
682 p += NETFLOW_BYTES_32_SIZE;
685 case NETFLOW_FIRST_SWITCHED:
686 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
687 p += NETFLOW_FIRST_SWITCHED_SIZE;
690 case NETFLOW_LAST_SWITCHED:
691 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
692 p += NETFLOW_LAST_SWITCHED_SIZE;
695 case NETFLOW_L4_SRC_PORT:
696 *((uint16_t *) p) = flow->sp;
697 p += NETFLOW_L4_SRC_PORT_SIZE;
700 case NETFLOW_L4_DST_PORT:
701 *((uint16_t *) p) = flow->dp;
702 p += NETFLOW_L4_DST_PORT_SIZE;
706 *((uint8_t *) p) = flow->proto;
707 p += NETFLOW_PROT_SIZE;
710 case NETFLOW_SRC_TOS:
711 *((uint8_t *) p) = flow->tos;
712 p += NETFLOW_SRC_TOS_SIZE;
715 case NETFLOW_TCP_FLAGS:
716 *((uint8_t *) p) = flow->tcp_flags;
717 p += NETFLOW_TCP_FLAGS_SIZE;
720 case NETFLOW_VERSION:
721 *((uint16_t *) p) = htons(netflow->Version);
722 p += NETFLOW_VERSION_SIZE;
726 *((uint16_t *) p) = htons(emit_count);
727 p += NETFLOW_COUNT_SIZE;
731 *((uint32_t *) p) = htonl(getuptime(&emit_time));
732 p += NETFLOW_UPTIME_SIZE;
735 case NETFLOW_UNIX_SECS:
736 *((uint32_t *) p) = htonl(emit_time.sec);
737 p += NETFLOW_UNIX_SECS_SIZE;
740 case NETFLOW_UNIX_NSECS:
741 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
742 p += NETFLOW_UNIX_NSECS_SIZE;
745 case NETFLOW_FLOW_SEQUENCE:
746 //*((uint32_t *) p) = htonl(emit_sequence);
747 *((uint32_t *) p) = 0;
748 p += NETFLOW_FLOW_SEQUENCE_SIZE;
752 /* Unsupported (uint8_t) */
753 case NETFLOW_ENGINE_TYPE:
754 case NETFLOW_ENGINE_ID:
755 case NETFLOW_FLAGS7_1:
756 case NETFLOW_SRC_MASK:
757 case NETFLOW_DST_MASK:
759 my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
762 *((uint8_t *) p) = 0;
763 p += NETFLOW_PAD8_SIZE;
765 case NETFLOW_SLICE_ID:
766 *((uint32_t *) p) = flow->slice_id;
767 p += NETFLOW_SLICE_ID_SIZE;
770 /* Unsupported (uint16_t) */
773 case NETFLOW_FLAGS7_2:
774 *((uint16_t *) p) = 0;
775 p += NETFLOW_PAD16_SIZE;
779 /* Unsupported (uint32_t) */
780 case NETFLOW_IPV4_NEXT_HOP:
781 case NETFLOW_ROUTER_SC:
782 *((uint32_t *) p) = 0;
783 p += NETFLOW_PAD32_SIZE;
787 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
788 format, i, format[i]);
792 #if ((DEBUG) & DEBUG_F)
793 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
800 Workaround for clone()-based threads
801 Try to change EUID independently of main thread
805 setregid(pw->pw_gid, pw->pw_gid);
806 setreuid(pw->pw_uid, pw->pw_uid);
815 struct timespec timeout;
816 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
818 p = (void *) &emit_packet + netflow->HeaderSize;
824 pthread_mutex_lock(&emit_mutex);
825 while (!flows_emit) {
826 gettimeofday(&now, 0);
827 timeout.tv_sec = now.tv_sec + emit_timeout;
828 /* Do not wait until emit_packet will filled - it may be too long */
829 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
830 pthread_mutex_unlock(&emit_mutex);
835 flows_emit = flows_emit->next;
836 #if ((DEBUG) & DEBUG_I)
839 pthread_mutex_unlock(&emit_mutex);
843 gettime(&start_time);
844 start_time.sec -= start_time_offset;
847 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
851 printf("Emit count = %d\n", emit_count);
854 if (emit_count == netflow->MaxFlows) {
857 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
858 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
859 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
860 #ifdef STD_NETFLOW_PDU
861 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
864 for (i = 0; i < npeers; i++) {
865 if (peers[i].type == PEER_FILE) {
866 if (netflow->SeqOffset)
867 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
868 peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
869 ret = write(peers[i].write_fd, emit_packet, size);
872 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
873 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
874 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
878 #if ((DEBUG) & DEBUG_E)
880 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
881 emit_count, i + 1, peers[i].seq);
884 peers[i].seq += emit_count;
887 if (emit_rate_bytes) {
889 delay = sent / emit_rate_bytes;
891 sent %= emit_rate_bytes;
893 timeout.tv_nsec = emit_rate_delay * delay;
894 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
899 if (peers[i].type == PEER_MIRROR) goto sendreal;
901 if (peers[i].type == PEER_ROTATE)
902 if (peer_rot_cur++ == peer_rot_work) {
904 if (netflow->SeqOffset)
905 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
906 ret = send(peers[i].write_fd, emit_packet, size, 0);
908 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
909 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
910 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
913 #if ((DEBUG) & DEBUG_E)
915 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
916 emit_count, i + 1, peers[i].seq);
919 peers[i].seq += emit_count;
922 if (emit_rate_bytes) {
924 delay = sent / emit_rate_bytes;
926 sent %= emit_rate_bytes;
928 timeout.tv_nsec = emit_rate_delay * delay;
929 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
934 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
935 emit_sequence += emit_count;
937 #if ((DEBUG) & DEBUG_I)
944 void *unpending_thread()
947 struct timespec timeout;
948 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
955 pthread_mutex_lock(&unpending_mutex);
958 while (!(pending_tail->flags & FLOW_PENDING)) {
959 gettimeofday(&now, 0);
960 timeout.tv_sec = now.tv_sec + unpending_timeout;
961 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
964 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
967 if (put_into(pending_tail, COPY_INTO
968 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
972 #if ((DEBUG) & DEBUG_I)
973 pkts_lost_unpending++;
977 #if ((DEBUG) & DEBUG_U)
978 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
981 pending_tail->flags = 0;
982 pending_tail = pending_tail->next;
983 #if ((DEBUG) & DEBUG_I)
991 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
995 struct Flow *flow, **flowpp;
997 struct timespec timeout;
1001 timeout.tv_nsec = 0;
1002 pthread_mutex_lock(&scan_mutex);
1006 timeout.tv_sec = now.sec + scan_interval;
1007 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1010 #if ((DEBUG) & DEBUG_S)
1011 my_log(LOG_DEBUG, "S: %d", now.sec);
1013 for (i = 0; i < 1 << HASH_BITS ; i++) {
1014 pthread_mutex_lock(&flows_mutex[i]);
1018 if (flow->flags & FLOW_FRAG) {
1019 /* Process fragmented flow */
1020 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1021 /* Fragmented flow expired - put it into special chain */
1022 #if ((DEBUG) & DEBUG_I)
1026 *flowpp = flow->next;
1028 flow->flags &= ~FLOW_FRAG;
1029 flow->next = scan_frag_dreg;
1030 scan_frag_dreg = flow;
1035 /* Flow is not frgamented */
1036 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1037 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1039 #if ((DEBUG) & DEBUG_S)
1040 my_log(LOG_DEBUG, "S: E %x", flow);
1042 #if ((DEBUG) & DEBUG_I)
1045 *flowpp = flow->next;
1046 pthread_mutex_lock(&emit_mutex);
1047 flow->next = flows_emit;
1049 #if ((DEBUG) & DEBUG_I)
1052 pthread_mutex_unlock(&emit_mutex);
1057 flowpp = &flow->next;
1060 pthread_mutex_unlock(&flows_mutex[i]);
1062 if (flows_emit) pthread_cond_signal(&emit_cond);
1064 while (scan_frag_dreg) {
1065 flow = scan_frag_dreg;
1066 scan_frag_dreg = flow->next;
1067 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1070 put_into(flow, MOVE_INTO
1071 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1075 #if ((DEBUG) & DEBUG_S)
1076 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1084 struct ulog_packet_msg *ulog_msg;
1088 int len, off_frag, psize;
1089 #if ((DEBUG) & DEBUG_C)
1098 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1100 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1103 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1105 #if ((DEBUG) & DEBUG_C)
1106 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1109 nl = (void *) &ulog_msg->payload;
1110 psize = ulog_msg->data_len;
1113 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1114 #if ((DEBUG) & DEBUG_C)
1115 strcat(logbuf, " U");
1116 my_log(LOG_DEBUG, "%s", logbuf);
1118 #if ((DEBUG) & DEBUG_I)
1124 if (pending_head->flags) {
1125 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1127 # if ((DEBUG) & DEBUG_C)
1132 "pending queue full:", "packet lost");
1134 #if ((DEBUG) & DEBUG_I)
1135 pkts_lost_capture++;
1140 #if ((DEBUG) & DEBUG_I)
1144 flow = pending_head;
1146 /* ?FIXME? Add sanity check for ip_len? */
1147 flow->size = ntohs(nl->ip_len);
1148 #if ((DEBUG) & DEBUG_I)
1149 size_total += flow->size;
1152 flow->sip = nl->ip_src;
1153 flow->dip = nl->ip_dst;
1154 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1156 /* It's going to be expensive calling this syscall on every flow.
1157 * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1161 if (ulog_msg->mark > 0) {
1162 flow->slice_id = xid_to_slice_id(ulog_msg->mark);
1165 if (flow->slice_id < 1)
1166 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
1169 if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1170 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1172 flow->iif = snmp_index(ulog_msg->indev_name);
1173 flow->oif = snmp_index(ulog_msg->outdev_name);
1174 flow->proto = nl->ip_p;
1176 flow->tcp_flags = 0;
1180 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1181 if (ulog_msg->timestamp_sec) {
1182 flow->ctime.sec = ulog_msg->timestamp_sec;
1183 flow->ctime.usec = ulog_msg->timestamp_usec;
1184 } else gettime(&flow->ctime);
1185 flow->mtime = flow->ctime;
1187 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1190 Offset (from network layer) to transport layer header/IP data
1191 IOW IP header size ;-)
1194 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1196 off_tl = nl->ip_hl << 2;
1197 tl = (void *) nl + off_tl;
1199 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1200 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1202 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1203 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1205 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1206 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1207 #if ((DEBUG) & DEBUG_C)
1208 strcat(logbuf, " F");
1210 #if ((DEBUG) & DEBUG_I)
1211 pkts_total_fragmented++;
1213 flow->flags |= FLOW_FRAG;
1214 flow->id = nl->ip_id;
1216 if (!(ntohs(nl->ip_off) & IP_MF)) {
1217 /* Packet whith IP_MF contains information about whole datagram size */
1218 flow->flags |= FLOW_LASTFRAG;
1219 /* size = frag_offset*8 + data_size */
1220 flow->sizeP = off_frag + flow->sizeF;
1224 #if ((DEBUG) & DEBUG_C)
1225 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1226 strcat(logbuf, buf);
1227 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1228 strcat(logbuf, buf);
1232 Fortunately most interesting transport layer information fit
1233 into first 8 bytes of IP data field (minimal nonzero size).
1234 Thus we don't need actual packet reassembling to build whole
1235 transport layer data. We only check the fragment offset for
1236 zero value to find packet with this information.
1238 if (!off_frag && psize >= 8) {
1239 switch (flow->proto) {
1242 flow->sp = ((struct udphdr *)tl)->uh_sport;
1243 flow->dp = ((struct udphdr *)tl)->uh_dport;
1248 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1249 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1252 #ifdef ICMP_TRICK_CISCO
1254 flow->dp = *((int32_t *) tl);
1259 /* Unknown transport layer */
1260 #if ((DEBUG) & DEBUG_C)
1261 strcat(logbuf, " U");
1268 #if ((DEBUG) & DEBUG_C)
1269 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1270 strcat(logbuf, buf);
1272 flow->flags |= FLOW_TL;
1276 /* Check for tcp flags presence (including CWR and ECE). */
1277 if (flow->proto == IPPROTO_TCP
1279 && psize >= 16 - off_frag) {
1280 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1281 #if ((DEBUG) & DEBUG_C)
1282 sprintf(buf, " TCP:%x", flow->tcp_flags);
1283 strcat(logbuf, buf);
1287 #if ((DEBUG) & DEBUG_C)
1288 sprintf(buf, " => %x", (unsigned) flow);
1289 strcat(logbuf, buf);
1290 my_log(LOG_DEBUG, "%s", logbuf);
1293 #if ((DEBUG) & DEBUG_I)
1295 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1296 if (pending_queue_trace < pending_queue_trace_candidate)
1297 pending_queue_trace = pending_queue_trace_candidate;
1300 /* Flow complete - inform unpending_thread() about it */
1301 pending_head->flags |= FLOW_PENDING;
1302 pending_head = pending_head->next;
1304 pthread_cond_signal(&unpending_cond);
1310 /* Copied out of CoDemux */
1312 static int init_daemon() {
1316 pidfile = fopen(PIDFILE, "w");
1317 if (pidfile == NULL) {
1318 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1321 if ((pid = fork()) < 0) {
1323 my_log(LOG_ERR, "Could not fork!\n");
1326 else if (pid != 0) {
1327 /* i'm the parent, writing down the child pid */
1328 fprintf(pidfile, "%u\n", pid);
1333 /* close the pid file */
1336 /* routines for any daemon process
1337 1. create a new session
1338 2. change directory to the root
1339 3. change the file creation permission
1342 chdir("/var/local/fprobe");
1348 int main(int argc, char **argv)
1351 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1352 int c, i, write_fd, memory_limit = 0;
1353 struct addrinfo hints, *res;
1354 struct sockaddr_in saddr;
1355 pthread_attr_t tattr;
1356 struct sigaction sigact;
1357 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1358 struct timeval timeout;
1360 sched_min = sched_get_priority_min(SCHED);
1361 sched_max = sched_get_priority_max(SCHED);
1363 memset(&saddr, 0 , sizeof(saddr));
1364 memset(&hints, 0 , sizeof(hints));
1365 hints.ai_flags = AI_PASSIVE;
1366 hints.ai_family = AF_INET;
1367 hints.ai_socktype = SOCK_DGRAM;
1369 /* Process command line options */
1372 while ((c = my_getopt(argc, argv, parms)) != -1) {
1382 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1383 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1384 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1385 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1386 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1387 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1388 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1389 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1390 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1391 if (parms[nflag].count) {
1392 switch (atoi(parms[nflag].arg)) {
1394 netflow = &NetFlow1;
1401 netflow = &NetFlow7;
1405 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1409 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1410 if (parms[lflag].count) {
1411 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1414 sprintf(errpbuf, "[%s]", log_suffix);
1415 strcat(ident, errpbuf);
1418 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1419 if (log_suffix) *--log_suffix = ':';
1421 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1423 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1426 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1427 if (parms[qflag].count) {
1428 pending_queue_length = atoi(parms[qflag].arg);
1429 if (pending_queue_length < 1) {
1430 fprintf(stderr, "Illegal %s\n", "pending queue length");
1434 if (parms[rflag].count) {
1435 schedp.sched_priority = atoi(parms[rflag].arg);
1436 if (schedp.sched_priority
1437 && (schedp.sched_priority < sched_min
1438 || schedp.sched_priority > sched_max)) {
1439 fprintf(stderr, "Illegal %s\n", "realtime priority");
1443 if (parms[Bflag].count) {
1444 sockbufsize = atoi(parms[Bflag].arg) << 10;
1446 if (parms[bflag].count) {
1447 bulk_quantity = atoi(parms[bflag].arg);
1448 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1449 fprintf(stderr, "Illegal %s\n", "bulk size");
1453 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1454 if (parms[Xflag].count) {
1455 for(i = 0; parms[Xflag].arg[i]; i++)
1456 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1457 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1459 rule = strtok(parms[Xflag].arg, ":");
1460 for (i = 0; rule; i++) {
1461 snmp_rules[i].len = strlen(rule);
1462 if (snmp_rules[i].len > IFNAMSIZ) {
1463 fprintf(stderr, "Illegal %s\n", "interface basename");
1466 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1467 if (!*(rule - 1)) *(rule - 1) = ',';
1468 rule = strtok(NULL, ",");
1470 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1473 snmp_rules[i].base = atoi(rule);
1475 rule = strtok(NULL, ":");
1479 if (parms[tflag].count)
1480 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1481 if (parms[aflag].count) {
1482 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1484 fprintf(stderr, "Illegal %s\n", "source address");
1487 saddr = *((struct sockaddr_in *) res->ai_addr);
1491 if (parms[uflag].count)
1492 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1493 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1498 /* Process collectors parameters. Brrrr... :-[ */
1500 npeers = argc - optind;
1502 /* Send to remote Netflow collector */
1503 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1504 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1506 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1508 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1509 fprintf(stderr, "socket(): %s\n", strerror(errno));
1512 peers[npeers].write_fd = write_fd;
1513 peers[npeers].type = PEER_MIRROR;
1514 peers[npeers].laddr = saddr;
1515 peers[npeers].seq = 0;
1516 if ((lhost = strchr(dport, '/'))) {
1518 if ((type = strchr(lhost, '/'))) {
1526 peers[npeers].type = PEER_ROTATE;
1535 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1536 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1540 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1541 sizeof(struct sockaddr_in))) {
1542 fprintf(stderr, "bind(): %s\n", strerror(errno));
1545 if (getaddrinfo(dhost, dport, &hints, &res)) {
1547 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1550 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1552 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1553 sizeof(struct sockaddr_in))) {
1554 fprintf(stderr, "connect(): %s\n", strerror(errno));
1558 /* Restore command line */
1559 if (type) *--type = '/';
1560 if (lhost) *--lhost = '/';
1564 else if (parms[fflag].count) {
1566 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1567 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1568 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1570 peers[npeers].write_fd = START_DATA_FD;
1571 peers[npeers].type = PEER_FILE;
1572 peers[npeers].seq = 0;
1581 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1582 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1584 fprintf(stderr, "libipulog initialization error: %s",
1585 ipulog_strerror(ipulog_errno));
1589 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1590 &sockbufsize, sizeof(sockbufsize)) < 0)
1591 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1593 /* Daemonize (if log destination stdout-free) */
1595 my_log_open(ident, verbosity, log_dest);
1599 if (!(log_dest & 2)) {
1600 /* Crash-proofing - Sapan*/
1604 fprintf(stderr, "fork(): %s", strerror(errno));
1609 freopen("/dev/null", "r", stdin);
1610 freopen("/dev/null", "w", stdout);
1611 freopen("/dev/null", "w", stderr);
1615 while (wait3(NULL,0,NULL) < 1);
1619 setvbuf(stdout, (char *)0, _IONBF, 0);
1620 setvbuf(stderr, (char *)0, _IONBF, 0);
1624 sprintf(errpbuf, "[%ld]", (long) pid);
1625 strcat(ident, errpbuf);
1627 /* Initialization */
1629 init_slice_id_hash();
1630 hash_init(); /* Actually for crc16 only */
1631 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1632 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1635 /* Hope 12 days is enough :-/ */
1636 start_time_offset = 1 << 20;
1638 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1640 gettime(&start_time);
1643 Build static pending queue as circular buffer.
1645 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1646 pending_tail = pending_head;
1647 for (i = pending_queue_length - 1; i--;) {
1648 if (!(pending_tail->next = mem_alloc())) {
1650 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1653 pending_tail = pending_tail->next;
1655 pending_tail->next = pending_head;
1656 pending_tail = pending_head;
1658 sigemptyset(&sig_mask);
1659 sigact.sa_handler = &sighandler;
1660 sigact.sa_mask = sig_mask;
1661 sigact.sa_flags = 0;
1662 sigaddset(&sig_mask, SIGTERM);
1663 sigaction(SIGTERM, &sigact, 0);
1664 #if ((DEBUG) & DEBUG_I)
1665 sigaddset(&sig_mask, SIGUSR1);
1666 sigaction(SIGUSR1, &sigact, 0);
1668 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1669 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1673 my_log(LOG_INFO, "Starting %s...", VERSION);
1675 if (parms[cflag].count) {
1676 if (chdir(parms[cflag].arg) || chroot(".")) {
1677 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1682 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1683 pthread_attr_init(&tattr);
1684 for (i = 0; i < THREADS - 1; i++) {
1685 if (schedp.sched_priority > 0) {
1686 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1687 (pthread_attr_setschedparam(&tattr, &schedp))) {
1688 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1692 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1693 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1696 pthread_detach(thid);
1697 schedp.sched_priority++;
1701 if (setgroups(0, NULL)) {
1702 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1705 if (setregid(pw->pw_gid, pw->pw_gid)) {
1706 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1709 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1710 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1715 if (!(pidfile = fopen(pidfilepath, "w")))
1716 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1718 fprintf(pidfile, "%ld\n", (long) pid);
1722 my_log(LOG_INFO, "pid: %d", pid);
1723 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1724 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1725 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1726 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1727 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1728 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1729 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1730 for (i = 0; i < nsnmp_rules; i++) {
1731 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1732 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1734 for (i = 0; i < npeers; i++) {
1735 switch (peers[i].type) {
1743 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1744 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1745 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1748 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1750 timeout.tv_usec = 0;
1752 || (total_elements - free_elements - pending_queue_length)
1754 || pending_tail->flags) {
1757 timeout.tv_sec = scan_interval;
1758 select(0, 0, 0, 0, &timeout);
1761 if (sigs & SIGTERM_MASK && !killed) {
1762 sigs &= ~SIGTERM_MASK;
1763 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1766 active_lifetime = -1;
1767 inactive_lifetime = -1;
1769 unpending_timeout = 1;
1771 pthread_cond_signal(&scan_cond);
1772 pthread_cond_signal(&unpending_cond);
1775 #if ((DEBUG) & DEBUG_I)
1776 if (sigs & SIGUSR1_MASK) {
1777 sigs &= ~SIGUSR1_MASK;
1782 remove(pidfilepath);
1783 #if ((DEBUG) & DEBUG_I)
1786 my_log(LOG_INFO, "Done.");