2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
40 /* #include "vserver.h" */
42 struct ipulog_handle {
45 struct sockaddr_nl local;
46 struct sockaddr_nl peer;
47 struct nlmsghdr* last_nlhdr;
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
62 #include <sys/param.h>
87 #include <sys/select.h>
93 #include <fprobe-ulog.h>
95 #include <my_getopt.h>
100 #define PIDFILE "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE sizeof("32767")
103 #define STD_NETFLOW_PDU
133 static struct getopt_parms parms[] = {
134 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
162 extern int optind, opterr, optopt;
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
181 #define BULK_QUANTITY 200
184 static unsigned epoch_length=60, log_epochs=1;
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
220 static char cur_output_file[MAX_PATH_LEN];
222 static struct Flow *flows[1 << HASH_BITS];
223 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
225 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
228 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *pending_head, *pending_tail;
231 static struct Flow *scan_frag_dreg;
233 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
235 static struct Flow *flows_emit;
237 static char ident[256] = "fprobe-ulog";
238 static FILE *pidfile;
239 static char *pidfilepath;
242 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
243 static struct ipulog_handle *ulog_handle;
244 static uint32_t ulog_gmask = 1;
245 static char *cap_buf;
246 static int nsnmp_rules;
247 static struct snmp_rule *snmp_rules;
248 static struct passwd *pw = 0;
253 "fprobe-ulog: a NetFlow probe. Version %s\n"
254 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
256 "-h\t\tDisplay this help\n"
257 "-U <mask>\tULOG group bitwise mask [1]\n"
258 "-s <seconds>\tHow often scan for expired flows [5]\n"
259 "-g <seconds>\tFragmented flow lifetime [30]\n"
260 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
261 "-f <filename>\tLog flow data in a file\n"
262 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
263 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
264 "-a <address>\tUse <address> as source for NetFlow flow\n"
265 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
266 "-M\t\tUse netfilter mark value as ToS flag\n"
267 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
268 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
269 "-q <flows>\tPending queue length [100]\n"
270 "-B <kilobytes>\tKernel capture buffer size [0]\n"
271 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
272 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
273 "-c <directory>\tDirectory to chroot to\n"
274 "-u <user>\tUser to run as\n"
275 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
276 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
277 "-y <remote:port>\tAddress of the NetFlow collector\n"
278 "-f <writable file>\tFile to write data into\n"
279 "-T <n>\tRotate log file every n epochs\n"
280 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
281 "-E <[1..60]>\tSize of an epoch in minutes\n"
282 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
284 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
288 #if ((DEBUG) & DEBUG_I)
291 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
292 pkts_total, pkts_total_fragmented, size_total,
293 pkts_pending - pkts_pending_done, pending_queue_trace);
294 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
295 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
296 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
297 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
298 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
299 total_elements, free_elements, total_memory);
303 void sighandler(int sig)
307 sigs |= SIGTERM_MASK;
309 #if ((DEBUG) & DEBUG_I)
311 sigs |= SIGUSR1_MASK;
317 void gettime(struct Time *now)
323 now->usec = t.tv_usec;
327 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
329 return (t1->sec - t2->sec)/60;
332 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
334 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
337 /* Uptime in miliseconds */
338 uint32_t getuptime(struct Time *t)
340 /* Maximum uptime is about 49/2 days */
341 return cmpmtime(t, &start_time);
344 /* Uptime in minutes */
345 uint32_t getuptime_minutes(struct Time *t)
347 /* Maximum uptime is about 49/2 days */
348 return cmpMtime(t, &start_time);
351 hash_t hash_flow(struct Flow *flow)
353 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
354 else return hash(flow, sizeof(struct Flow_TL));
357 uint16_t snmp_index(char *name) {
360 if (!*name) return 0;
362 for (i = 0; (int) i < nsnmp_rules; i++) {
363 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
364 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
367 if ((i = if_nametoindex(name))) return i;
372 inline void copy_flow(struct Flow *src, struct Flow *dst)
379 dst->slice_id = src->slice_id;
380 dst->proto = src->proto;
381 dst->tcp_flags = src->tcp_flags;
385 dst->pkts = src->pkts;
386 dst->size = src->size;
387 dst->sizeF = src->sizeF;
388 dst->sizeP = src->sizeP;
389 dst->ctime = src->ctime;
390 dst->mtime = src->mtime;
391 dst->flags = src->flags;
394 void read_cur_epoch() {
396 /* Reset to -1 in case the read fails */
398 fd = open(LAST_EPOCH_FILE, O_RDONLY);
400 char snum[MAX_EPOCH_SIZE];
402 len = read(fd, snum, MAX_EPOCH_SIZE-1);
405 sscanf(snum,"%d",&cur_epoch);
406 cur_epoch++; /* Let's not stone the last epoch */
414 /* Dumps the current epoch in a file to cope with
415 * reboots and killings of fprobe */
417 void update_cur_epoch_file(int n) {
419 char snum[MAX_EPOCH_SIZE];
420 len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
421 fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
423 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
426 write(fd, snum, len);
430 /* Get the file descriptor corresponding to the current file.
431 * The kludgy implementation is to abstract away the 'current
432 * file descriptor', which may also be a socket.
435 unsigned get_data_file_fd(char *fname, int cur_fd) {
439 struct statfs statfs;
442 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
443 * doesn't solve the problem */
445 cur_uptime = getuptime_minutes(&now);
447 if (cur_fd != START_DATA_FD) {
448 if (fstatfs(cur_fd, &statfs) == -1) {
449 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
452 if (min_free && (statfs.f_bavail < min_free)
453 && (cur_epoch==last_peak))
455 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
460 assume that we can reclaim space by overwriting our own files
461 and that the difference in size will not fill the disk - sapan
466 /* If epoch length has been exceeded,
467 * or we're starting up
468 * or we're going back to the first epoch */
469 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
471 prev_uptime = cur_uptime;
472 cur_epoch = (cur_epoch + 1) % log_epochs;
473 if (cur_epoch>last_peak) last_peak = cur_epoch;
476 /* Compress the finished file */
477 char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")];
478 snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
481 snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
482 if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)) < 0) {
483 my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
486 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
487 my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
489 update_cur_epoch_file(cur_epoch);
498 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
500 struct Flow **flowpp;
506 if (prev) flowpp = *prev;
509 if (where->sip.s_addr == what->sip.s_addr
510 && where->dip.s_addr == what->dip.s_addr
511 && where->proto == what->proto) {
512 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
514 /* Both unfragmented */
515 if ((what->sp == where->sp)
516 && (what->dp == where->dp)) goto done;
519 /* Both fragmented */
520 if (where->id == what->id) goto done;
524 flowpp = &where->next;
528 if (prev) *prev = flowpp;
532 int put_into(struct Flow *flow, int flag
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
540 struct Flow *flown, **flowpp;
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
546 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
547 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
550 pthread_mutex_lock(&flows_mutex[h]);
552 if (!(flown = find(flows[h], flow, &flowpp))) {
553 /* No suitable flow found - add */
554 if (flag == COPY_INTO) {
555 if ((flown = mem_alloc())) {
556 copy_flow(flow, flown);
559 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
560 my_log(LOG_ERR, "%s %s. %s",
561 "mem_alloc():", strerror(errno), "packet lost");
566 flow->next = flows[h];
568 #if ((DEBUG) & DEBUG_I)
570 if (flow->flags & FLOW_FRAG) flows_fragmented++;
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
574 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
579 /* Found suitable flow - update */
580 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
581 sprintf(buf, " +> %x", (unsigned) flown);
584 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
585 flown->mtime = flow->mtime;
586 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
587 flown->ctime = flow->ctime;
588 flown->tcp_flags |= flow->tcp_flags;
589 flown->size += flow->size;
590 flown->pkts += flow->pkts;
592 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
593 * if a better value comes along. A good example of this is that by the time CoDemux sets the
594 * peercred of a flow, it has already been accounted for here and attributed to root. */
596 if (flown->slice_id<1)
597 flown->slice_id = flow->slice_id;
600 if (flow->flags & FLOW_FRAG) {
601 /* Fragmented flow require some additional work */
602 if (flow->flags & FLOW_TL) {
605 Several packets with FLOW_TL (attack)
607 flown->sp = flow->sp;
608 flown->dp = flow->dp;
610 if (flow->flags & FLOW_LASTFRAG) {
613 Several packets with FLOW_LASTFRAG (attack)
615 flown->sizeP = flow->sizeP;
617 flown->flags |= flow->flags;
618 flown->sizeF += flow->sizeF;
619 if ((flown->flags & FLOW_LASTFRAG)
620 && (flown->sizeF >= flown->sizeP)) {
621 /* All fragments received - flow reassembled */
622 *flowpp = flown->next;
623 pthread_mutex_unlock(&flows_mutex[h]);
624 #if ((DEBUG) & DEBUG_I)
629 flown->flags &= ~FLOW_FRAG;
630 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
633 ret = put_into(flown, MOVE_INTO
634 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
640 if (flag == MOVE_INTO) mem_free(flow);
642 pthread_mutex_unlock(&flows_mutex[h]);
648 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
652 for (i = 0; i < fields; i++) {
653 #if ((DEBUG) & DEBUG_F)
654 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
657 case NETFLOW_IPV4_SRC_ADDR:
658 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
659 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
662 case NETFLOW_IPV4_DST_ADDR:
663 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
664 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
665 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
667 p += NETFLOW_IPV4_DST_ADDR_SIZE;
670 case NETFLOW_INPUT_SNMP:
671 *((uint16_t *) p) = htons(flow->iif);
672 p += NETFLOW_INPUT_SNMP_SIZE;
675 case NETFLOW_OUTPUT_SNMP:
676 *((uint16_t *) p) = htons(flow->oif);
677 p += NETFLOW_OUTPUT_SNMP_SIZE;
680 case NETFLOW_PKTS_32:
681 *((uint32_t *) p) = htonl(flow->pkts);
682 p += NETFLOW_PKTS_32_SIZE;
685 case NETFLOW_BYTES_32:
686 *((uint32_t *) p) = htonl(flow->size);
687 p += NETFLOW_BYTES_32_SIZE;
690 case NETFLOW_FIRST_SWITCHED:
691 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
692 p += NETFLOW_FIRST_SWITCHED_SIZE;
695 case NETFLOW_LAST_SWITCHED:
696 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
697 p += NETFLOW_LAST_SWITCHED_SIZE;
700 case NETFLOW_L4_SRC_PORT:
701 *((uint16_t *) p) = flow->sp;
702 p += NETFLOW_L4_SRC_PORT_SIZE;
705 case NETFLOW_L4_DST_PORT:
706 *((uint16_t *) p) = flow->dp;
707 p += NETFLOW_L4_DST_PORT_SIZE;
711 *((uint8_t *) p) = flow->proto;
712 p += NETFLOW_PROT_SIZE;
715 case NETFLOW_SRC_TOS:
716 *((uint8_t *) p) = flow->tos;
717 p += NETFLOW_SRC_TOS_SIZE;
720 case NETFLOW_TCP_FLAGS:
721 *((uint8_t *) p) = flow->tcp_flags;
722 p += NETFLOW_TCP_FLAGS_SIZE;
725 case NETFLOW_VERSION:
726 *((uint16_t *) p) = htons(netflow->Version);
727 p += NETFLOW_VERSION_SIZE;
731 *((uint16_t *) p) = htons(emit_count);
732 p += NETFLOW_COUNT_SIZE;
736 *((uint32_t *) p) = htonl(getuptime(&emit_time));
737 p += NETFLOW_UPTIME_SIZE;
740 case NETFLOW_UNIX_SECS:
741 *((uint32_t *) p) = htonl(emit_time.sec);
742 p += NETFLOW_UNIX_SECS_SIZE;
745 case NETFLOW_UNIX_NSECS:
746 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
747 p += NETFLOW_UNIX_NSECS_SIZE;
750 case NETFLOW_FLOW_SEQUENCE:
751 //*((uint32_t *) p) = htonl(emit_sequence);
752 *((uint32_t *) p) = 0;
753 p += NETFLOW_FLOW_SEQUENCE_SIZE;
757 /* Unsupported (uint8_t) */
758 case NETFLOW_ENGINE_TYPE:
759 case NETFLOW_ENGINE_ID:
760 case NETFLOW_FLAGS7_1:
761 case NETFLOW_SRC_MASK:
762 case NETFLOW_DST_MASK:
764 my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
767 *((uint8_t *) p) = 0;
768 p += NETFLOW_PAD8_SIZE;
770 case NETFLOW_SLICE_ID:
771 *((uint32_t *) p) = flow->slice_id;
772 p += NETFLOW_SLICE_ID_SIZE;
775 /* Unsupported (uint16_t) */
778 case NETFLOW_FLAGS7_2:
779 *((uint16_t *) p) = 0;
780 p += NETFLOW_PAD16_SIZE;
784 /* Unsupported (uint32_t) */
785 case NETFLOW_IPV4_NEXT_HOP:
786 case NETFLOW_ROUTER_SC:
787 *((uint32_t *) p) = 0;
788 p += NETFLOW_PAD32_SIZE;
792 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
793 format, i, format[i]);
797 #if ((DEBUG) & DEBUG_F)
798 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
805 Workaround for clone()-based threads
806 Try to change EUID independently of main thread
810 setregid(pw->pw_gid, pw->pw_gid);
811 setreuid(pw->pw_uid, pw->pw_uid);
820 struct timespec timeout;
821 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
823 p = (void *) &emit_packet + netflow->HeaderSize;
829 pthread_mutex_lock(&emit_mutex);
830 while (!flows_emit) {
831 gettimeofday(&now, 0);
832 timeout.tv_sec = now.tv_sec + emit_timeout;
833 /* Do not wait until emit_packet will filled - it may be too long */
834 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
835 //my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
836 pthread_mutex_unlock(&emit_mutex);
841 flows_emit = flows_emit->next;
842 #if ((DEBUG) & DEBUG_I)
845 pthread_mutex_unlock(&emit_mutex);
849 gettime(&start_time);
850 start_time.sec -= start_time_offset;
853 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
857 printf("Emit count = %d\n", emit_count);
860 if (emit_count == netflow->MaxFlows) {
863 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
864 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
865 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
866 #ifdef STD_NETFLOW_PDU
867 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
870 for (i = 0; i < npeers; i++) {
871 if (peers[i].type == PEER_FILE) {
872 if (netflow->SeqOffset)
873 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
874 peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
875 ret = write(peers[i].write_fd, emit_packet, size);
878 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
879 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
880 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
884 #if ((DEBUG) & DEBUG_E)
886 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
887 emit_count, i + 1, peers[i].seq);
890 peers[i].seq += emit_count;
893 if (emit_rate_bytes) {
895 delay = sent / emit_rate_bytes;
897 sent %= emit_rate_bytes;
899 timeout.tv_nsec = emit_rate_delay * delay;
900 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
905 if (peers[i].type == PEER_MIRROR) goto sendreal;
907 if (peers[i].type == PEER_ROTATE)
908 if (peer_rot_cur++ == peer_rot_work) {
910 if (netflow->SeqOffset)
911 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
912 ret = send(peers[i].write_fd, emit_packet, size, 0);
914 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
915 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
916 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
919 #if ((DEBUG) & DEBUG_E)
921 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
922 emit_count, i + 1, peers[i].seq);
925 peers[i].seq += emit_count;
928 if (emit_rate_bytes) {
930 delay = sent / emit_rate_bytes;
932 sent %= emit_rate_bytes;
934 timeout.tv_nsec = emit_rate_delay * delay;
935 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
940 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
941 emit_sequence += emit_count;
943 #if ((DEBUG) & DEBUG_I)
950 void *unpending_thread()
953 struct timespec timeout;
954 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
961 pthread_mutex_lock(&unpending_mutex);
964 while (!(pending_tail->flags & FLOW_PENDING)) {
965 gettimeofday(&now, 0);
966 timeout.tv_sec = now.tv_sec + unpending_timeout;
967 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
970 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
973 if (put_into(pending_tail, COPY_INTO
974 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
978 #if ((DEBUG) & DEBUG_I)
979 pkts_lost_unpending++;
983 #if ((DEBUG) & DEBUG_U)
984 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
987 pending_tail->flags = 0;
988 pending_tail = pending_tail->next;
989 #if ((DEBUG) & DEBUG_I)
997 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1001 struct Flow *flow, **flowpp;
1003 struct timespec timeout;
1007 timeout.tv_nsec = 0;
1008 pthread_mutex_lock(&scan_mutex);
1012 timeout.tv_sec = now.sec + scan_interval;
1013 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1016 #if ((DEBUG) & DEBUG_S)
1017 my_log(LOG_DEBUG, "S: %d", now.sec);
1019 for (i = 0; i < 1 << HASH_BITS ; i++) {
1020 pthread_mutex_lock(&flows_mutex[i]);
1024 if (flow->flags & FLOW_FRAG) {
1025 /* Process fragmented flow */
1026 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1027 /* Fragmented flow expired - put it into special chain */
1028 #if ((DEBUG) & DEBUG_I)
1032 *flowpp = flow->next;
1034 flow->flags &= ~FLOW_FRAG;
1035 flow->next = scan_frag_dreg;
1036 scan_frag_dreg = flow;
1041 /* Flow is not frgamented */
1042 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1043 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1045 #if ((DEBUG) & DEBUG_S)
1046 my_log(LOG_DEBUG, "S: E %x", flow);
1048 #if ((DEBUG) & DEBUG_I)
1051 *flowpp = flow->next;
1052 pthread_mutex_lock(&emit_mutex);
1053 flow->next = flows_emit;
1055 #if ((DEBUG) & DEBUG_I)
1058 pthread_mutex_unlock(&emit_mutex);
1063 flowpp = &flow->next;
1066 pthread_mutex_unlock(&flows_mutex[i]);
1068 if (flows_emit) pthread_cond_signal(&emit_cond);
1070 while (scan_frag_dreg) {
1071 flow = scan_frag_dreg;
1072 scan_frag_dreg = flow->next;
1073 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1076 put_into(flow, MOVE_INTO
1077 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1081 #if ((DEBUG) & DEBUG_S)
1082 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1090 struct ulog_packet_msg *ulog_msg;
1094 int len, off_frag, psize;
1095 #if ((DEBUG) & DEBUG_C)
1104 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1106 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1109 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1111 #if ((DEBUG) & DEBUG_C)
1112 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1115 nl = (void *) &ulog_msg->payload;
1116 psize = ulog_msg->data_len;
1119 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1120 #if ((DEBUG) & DEBUG_C)
1121 strcat(logbuf, " U");
1122 my_log(LOG_DEBUG, "%s", logbuf);
1124 #if ((DEBUG) & DEBUG_I)
1130 if (pending_head->flags) {
1131 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1133 # if ((DEBUG) & DEBUG_C)
1138 "pending queue full:", "packet lost");
1140 #if ((DEBUG) & DEBUG_I)
1141 pkts_lost_capture++;
1146 #if ((DEBUG) & DEBUG_I)
1150 flow = pending_head;
1152 /* ?FIXME? Add sanity check for ip_len? */
1153 flow->size = ntohs(nl->ip_len);
1154 #if ((DEBUG) & DEBUG_I)
1155 size_total += flow->size;
1158 flow->sip = nl->ip_src;
1159 flow->dip = nl->ip_dst;
1160 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1162 /* It's going to be expensive calling this syscall on every flow.
1163 * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1165 flow->slice_id = ulog_msg->mark;
1167 /*if (flow->slice_id < 1)
1168 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid*/
1171 if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1172 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1174 flow->iif = snmp_index(ulog_msg->indev_name);
1175 flow->oif = snmp_index(ulog_msg->outdev_name);
1176 flow->proto = nl->ip_p;
1178 flow->tcp_flags = 0;
1182 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1183 if (ulog_msg->timestamp_sec) {
1184 flow->ctime.sec = ulog_msg->timestamp_sec;
1185 flow->ctime.usec = ulog_msg->timestamp_usec;
1186 } else gettime(&flow->ctime);
1187 flow->mtime = flow->ctime;
1189 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1192 Offset (from network layer) to transport layer header/IP data
1193 IOW IP header size ;-)
1196 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1198 off_tl = nl->ip_hl << 2;
1199 tl = (void *) nl + off_tl;
1201 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1202 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1204 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1205 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1207 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1208 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1209 #if ((DEBUG) & DEBUG_C)
1210 strcat(logbuf, " F");
1212 #if ((DEBUG) & DEBUG_I)
1213 pkts_total_fragmented++;
1215 flow->flags |= FLOW_FRAG;
1216 flow->id = nl->ip_id;
1218 if (!(ntohs(nl->ip_off) & IP_MF)) {
1219 /* Packet whith IP_MF contains information about whole datagram size */
1220 flow->flags |= FLOW_LASTFRAG;
1221 /* size = frag_offset*8 + data_size */
1222 flow->sizeP = off_frag + flow->sizeF;
1226 #if ((DEBUG) & DEBUG_C)
1227 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1228 strcat(logbuf, buf);
1229 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1230 strcat(logbuf, buf);
1234 Fortunately most interesting transport layer information fit
1235 into first 8 bytes of IP data field (minimal nonzero size).
1236 Thus we don't need actual packet reassembling to build whole
1237 transport layer data. We only check the fragment offset for
1238 zero value to find packet with this information.
1240 if (!off_frag && psize >= 8) {
1241 switch (flow->proto) {
1244 flow->sp = ((struct udphdr *)tl)->uh_sport;
1245 flow->dp = ((struct udphdr *)tl)->uh_dport;
1250 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1251 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1254 #ifdef ICMP_TRICK_CISCO
1256 flow->dp = *((int32_t *) tl);
1261 /* Unknown transport layer */
1262 #if ((DEBUG) & DEBUG_C)
1263 strcat(logbuf, " U");
1270 #if ((DEBUG) & DEBUG_C)
1271 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1272 strcat(logbuf, buf);
1274 flow->flags |= FLOW_TL;
1278 /* Check for tcp flags presence (including CWR and ECE). */
1279 if (flow->proto == IPPROTO_TCP
1281 && psize >= 16 - off_frag) {
1282 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1283 #if ((DEBUG) & DEBUG_C)
1284 sprintf(buf, " TCP:%x", flow->tcp_flags);
1285 strcat(logbuf, buf);
1289 #if ((DEBUG) & DEBUG_C)
1290 sprintf(buf, " => %x", (unsigned) flow);
1291 strcat(logbuf, buf);
1292 my_log(LOG_DEBUG, "%s", logbuf);
1295 #if ((DEBUG) & DEBUG_I)
1297 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1298 if (pending_queue_trace < pending_queue_trace_candidate)
1299 pending_queue_trace = pending_queue_trace_candidate;
1302 /* Flow complete - inform unpending_thread() about it */
1303 pending_head->flags |= FLOW_PENDING;
1304 pending_head = pending_head->next;
1306 pthread_cond_signal(&unpending_cond);
1312 /* Copied out of CoDemux */
1314 static int init_daemon() {
1318 pidfile = fopen(PIDFILE, "w");
1319 if (pidfile == NULL) {
1320 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1323 if ((pid = fork()) < 0) {
1325 my_log(LOG_ERR, "Could not fork!\n");
1328 else if (pid != 0) {
1329 /* i'm the parent, writing down the child pid */
1330 fprintf(pidfile, "%u\n", pid);
1335 /* close the pid file */
1338 /* routines for any daemon process
1339 1. create a new session
1340 2. change directory to the root
1341 3. change the file creation permission
1344 chdir("/var/local/fprobe");
1350 int main(int argc, char **argv)
1353 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1354 int c, i, write_fd, memory_limit = 0;
1355 struct addrinfo hints, *res;
1356 struct sockaddr_in saddr;
1357 pthread_attr_t tattr;
1358 struct sigaction sigact;
1359 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1360 struct timeval timeout;
1362 sched_min = sched_get_priority_min(SCHED);
1363 sched_max = sched_get_priority_max(SCHED);
1365 memset(&saddr, 0 , sizeof(saddr));
1366 memset(&hints, 0 , sizeof(hints));
1367 hints.ai_flags = AI_PASSIVE;
1368 hints.ai_family = AF_INET;
1369 hints.ai_socktype = SOCK_DGRAM;
1371 /* Process command line options */
1374 while ((c = my_getopt(argc, argv, parms)) != -1) {
1384 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1385 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1386 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1387 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1388 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1389 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1390 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1391 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1392 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1393 if (parms[nflag].count) {
1394 switch (atoi(parms[nflag].arg)) {
1396 netflow = &NetFlow1;
1403 netflow = &NetFlow7;
1407 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1411 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1412 if (parms[lflag].count) {
1413 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1416 sprintf(errpbuf, "[%s]", log_suffix);
1417 strcat(ident, errpbuf);
1420 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1421 if (log_suffix) *--log_suffix = ':';
1423 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1425 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1428 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1429 if (parms[qflag].count) {
1430 pending_queue_length = atoi(parms[qflag].arg);
1431 if (pending_queue_length < 1) {
1432 fprintf(stderr, "Illegal %s\n", "pending queue length");
1436 if (parms[rflag].count) {
1437 schedp.sched_priority = atoi(parms[rflag].arg);
1438 if (schedp.sched_priority
1439 && (schedp.sched_priority < sched_min
1440 || schedp.sched_priority > sched_max)) {
1441 fprintf(stderr, "Illegal %s\n", "realtime priority");
1445 if (parms[Bflag].count) {
1446 sockbufsize = atoi(parms[Bflag].arg) << 10;
1448 if (parms[bflag].count) {
1449 bulk_quantity = atoi(parms[bflag].arg);
1450 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1451 fprintf(stderr, "Illegal %s\n", "bulk size");
1455 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1456 if (parms[Xflag].count) {
1457 for(i = 0; parms[Xflag].arg[i]; i++)
1458 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1459 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1461 rule = strtok(parms[Xflag].arg, ":");
1462 for (i = 0; rule; i++) {
1463 snmp_rules[i].len = strlen(rule);
1464 if (snmp_rules[i].len > IFNAMSIZ) {
1465 fprintf(stderr, "Illegal %s\n", "interface basename");
1468 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1469 if (!*(rule - 1)) *(rule - 1) = ',';
1470 rule = strtok(NULL, ",");
1472 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1475 snmp_rules[i].base = atoi(rule);
1477 rule = strtok(NULL, ":");
1481 if (parms[tflag].count)
1482 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1483 if (parms[aflag].count) {
1484 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1486 fprintf(stderr, "Illegal %s\n", "source address");
1489 saddr = *((struct sockaddr_in *) res->ai_addr);
1493 if (parms[uflag].count)
1494 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1495 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1500 /* Process collectors parameters. Brrrr... :-[ */
1502 npeers = argc - optind;
1504 /* Send to remote Netflow collector */
1505 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1506 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1508 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1510 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1511 fprintf(stderr, "socket(): %s\n", strerror(errno));
1514 peers[npeers].write_fd = write_fd;
1515 peers[npeers].type = PEER_MIRROR;
1516 peers[npeers].laddr = saddr;
1517 peers[npeers].seq = 0;
1518 if ((lhost = strchr(dport, '/'))) {
1520 if ((type = strchr(lhost, '/'))) {
1528 peers[npeers].type = PEER_ROTATE;
1537 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1538 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1542 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1543 sizeof(struct sockaddr_in))) {
1544 fprintf(stderr, "bind(): %s\n", strerror(errno));
1547 if (getaddrinfo(dhost, dport, &hints, &res)) {
1549 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1552 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1554 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1555 sizeof(struct sockaddr_in))) {
1556 fprintf(stderr, "connect(): %s\n", strerror(errno));
1560 /* Restore command line */
1561 if (type) *--type = '/';
1562 if (lhost) *--lhost = '/';
1566 else if (parms[fflag].count) {
1568 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1569 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1570 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1572 peers[npeers].write_fd = START_DATA_FD;
1573 peers[npeers].type = PEER_FILE;
1574 peers[npeers].seq = 0;
1583 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1584 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1586 fprintf(stderr, "libipulog initialization error: %s",
1587 ipulog_strerror(ipulog_errno));
1591 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1592 &sockbufsize, sizeof(sockbufsize)) < 0)
1593 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1595 /* Daemonize (if log destination stdout-free) */
1597 my_log_open(ident, verbosity, log_dest);
1601 if (!(log_dest & 2)) {
1602 /* Crash-proofing - Sapan*/
1606 fprintf(stderr, "fork(): %s", strerror(errno));
1611 freopen("/dev/null", "r", stdin);
1612 freopen("/dev/null", "w", stdout);
1613 freopen("/dev/null", "w", stderr);
1617 while (wait3(NULL,0,NULL) < 1);
1621 setvbuf(stdout, (char *)0, _IONBF, 0);
1622 setvbuf(stderr, (char *)0, _IONBF, 0);
1626 sprintf(errpbuf, "[%ld]", (long) pid);
1627 strcat(ident, errpbuf);
1629 /* Initialization */
1631 // init_slice_id_hash();
1632 hash_init(); /* Actually for crc16 only */
1633 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1634 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1637 /* Hope 12 days is enough :-/ */
1638 start_time_offset = 1 << 20;
1640 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1642 gettime(&start_time);
1645 Build static pending queue as circular buffer.
1647 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1648 pending_tail = pending_head;
1649 for (i = pending_queue_length - 1; i--;) {
1650 if (!(pending_tail->next = mem_alloc())) {
1652 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1655 pending_tail = pending_tail->next;
1657 pending_tail->next = pending_head;
1658 pending_tail = pending_head;
1660 sigemptyset(&sig_mask);
1661 sigact.sa_handler = &sighandler;
1662 sigact.sa_mask = sig_mask;
1663 sigact.sa_flags = 0;
1664 sigaddset(&sig_mask, SIGTERM);
1665 sigaction(SIGTERM, &sigact, 0);
1666 #if ((DEBUG) & DEBUG_I)
1667 sigaddset(&sig_mask, SIGUSR1);
1668 sigaction(SIGUSR1, &sigact, 0);
1670 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1671 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1675 my_log(LOG_INFO, "Starting %s...", VERSION);
1677 if (parms[cflag].count) {
1678 if (chdir(parms[cflag].arg) || chroot(".")) {
1679 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1684 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1685 pthread_attr_init(&tattr);
1686 for (i = 0; i < THREADS - 1; i++) {
1687 if (schedp.sched_priority > 0) {
1688 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1689 (pthread_attr_setschedparam(&tattr, &schedp))) {
1690 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1694 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1695 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1698 pthread_detach(thid);
1699 schedp.sched_priority++;
1703 if (setgroups(0, NULL)) {
1704 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1707 if (setregid(pw->pw_gid, pw->pw_gid)) {
1708 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1711 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1712 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1717 if (!(pidfile = fopen(pidfilepath, "w")))
1718 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1720 fprintf(pidfile, "%ld\n", (long) pid);
1724 my_log(LOG_INFO, "pid: %d", pid);
1725 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1726 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1727 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1728 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1729 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1730 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1731 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1732 for (i = 0; i < nsnmp_rules; i++) {
1733 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1734 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1736 for (i = 0; i < npeers; i++) {
1737 switch (peers[i].type) {
1745 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1746 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1747 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1750 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1752 timeout.tv_usec = 0;
1754 || (total_elements - free_elements - pending_queue_length)
1756 || pending_tail->flags) {
1759 timeout.tv_sec = scan_interval;
1760 select(0, 0, 0, 0, &timeout);
1763 if (sigs & SIGTERM_MASK && !killed) {
1764 sigs &= ~SIGTERM_MASK;
1765 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1768 active_lifetime = -1;
1769 inactive_lifetime = -1;
1771 unpending_timeout = 1;
1773 pthread_cond_signal(&scan_cond);
1774 pthread_cond_signal(&unpending_cond);
1777 #if ((DEBUG) & DEBUG_I)
1778 if (sigs & SIGUSR1_MASK) {
1779 sigs &= ~SIGUSR1_MASK;
1784 remove(pidfilepath);
1785 #if ((DEBUG) & DEBUG_I)
1788 my_log(LOG_INFO, "Done.");