2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
40 /* #include "vserver.h" */
42 struct ipulog_handle {
45 struct sockaddr_nl local;
46 struct sockaddr_nl peer;
47 struct nlmsghdr* last_nlhdr;
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
62 #include <sys/param.h>
87 #include <sys/select.h>
93 #include <fprobe-ulog.h>
95 #include <my_getopt.h>
100 #define PIDFILE "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE sizeof("32767")
103 #define STD_NETFLOW_PDU
133 static struct getopt_parms parms[] = {
134 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
162 extern int optind, opterr, optopt;
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
181 #define BULK_QUANTITY 200
184 static unsigned epoch_length=60, log_epochs=1;
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
220 static char cur_output_file[MAX_PATH_LEN];
222 static struct Flow *flows[1 << HASH_BITS];
223 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
225 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
228 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *pending_head, *pending_tail;
231 static struct Flow *scan_frag_dreg;
233 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
235 static struct Flow *flows_emit;
237 static char ident[256] = "fprobe-ulog";
238 static FILE *pidfile;
239 static char *pidfilepath;
242 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
243 static struct ipulog_handle *ulog_handle;
244 static uint32_t ulog_gmask = 1;
245 static char *cap_buf;
246 static int nsnmp_rules;
247 static struct snmp_rule *snmp_rules;
248 static struct passwd *pw = 0;
253 "fprobe-ulog: a NetFlow probe. Version %s\n"
254 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
256 "-h\t\tDisplay this help\n"
257 "-U <mask>\tULOG group bitwise mask [1]\n"
258 "-s <seconds>\tHow often scan for expired flows [5]\n"
259 "-g <seconds>\tFragmented flow lifetime [30]\n"
260 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
261 "-f <filename>\tLog flow data in a file\n"
262 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
263 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
264 "-a <address>\tUse <address> as source for NetFlow flow\n"
265 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
266 "-M\t\tUse netfilter mark value as ToS flag\n"
267 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
268 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
269 "-q <flows>\tPending queue length [100]\n"
270 "-B <kilobytes>\tKernel capture buffer size [0]\n"
271 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
272 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
273 "-c <directory>\tDirectory to chroot to\n"
274 "-u <user>\tUser to run as\n"
275 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
276 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
277 "-y <remote:port>\tAddress of the NetFlow collector\n"
278 "-f <writable file>\tFile to write data into\n"
279 "-T <n>\tRotate log file every n epochs\n"
280 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
281 "-E <[1..60]>\tSize of an epoch in minutes\n"
282 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
284 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
288 #if ((DEBUG) & DEBUG_I)
291 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
292 pkts_total, pkts_total_fragmented, size_total,
293 pkts_pending - pkts_pending_done, pending_queue_trace);
294 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
295 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
296 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
297 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
298 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
299 total_elements, free_elements, total_memory);
303 void sighandler(int sig)
307 sigs |= SIGTERM_MASK;
309 #if ((DEBUG) & DEBUG_I)
311 sigs |= SIGUSR1_MASK;
317 void gettime(struct Time *now)
323 now->usec = t.tv_usec;
327 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
329 return (t1->sec - t2->sec)/60;
332 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
334 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
337 /* Uptime in miliseconds */
338 uint32_t getuptime(struct Time *t)
340 /* Maximum uptime is about 49/2 days */
341 return cmpmtime(t, &start_time);
344 /* Uptime in minutes */
345 uint32_t getuptime_minutes(struct Time *t)
347 /* Maximum uptime is about 49/2 days */
348 return cmpMtime(t, &start_time);
351 hash_t hash_flow(struct Flow *flow)
353 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
354 else return hash(flow, sizeof(struct Flow_TL));
357 uint16_t snmp_index(char *name) {
360 if (!*name) return 0;
362 for (i = 0; (int) i < nsnmp_rules; i++) {
363 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
364 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
367 if ((i = if_nametoindex(name))) return i;
372 inline void copy_flow(struct Flow *src, struct Flow *dst)
379 dst->slice_id = src->slice_id;
380 dst->proto = src->proto;
381 dst->tcp_flags = src->tcp_flags;
385 dst->pkts = src->pkts;
386 dst->size = src->size;
387 dst->sizeF = src->sizeF;
388 dst->sizeP = src->sizeP;
389 dst->ctime = src->ctime;
390 dst->mtime = src->mtime;
391 dst->flags = src->flags;
394 void read_cur_epoch() {
396 /* Reset to -1 in case the read fails */
398 fd = open(LAST_EPOCH_FILE, O_RDONLY);
400 char snum[MAX_EPOCH_SIZE];
402 len = read(fd, snum, MAX_EPOCH_SIZE-1);
405 sscanf(snum,"%d",&cur_epoch);
406 cur_epoch++; /* Let's not stone the last epoch */
414 /* Dumps the current epoch in a file to cope with
415 * reboots and killings of fprobe */
417 void update_cur_epoch_file(int n) {
419 char snum[MAX_EPOCH_SIZE];
420 len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
421 fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
423 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
426 write(fd, snum, len);
430 /* Get the file descriptor corresponding to the current file.
431 * The kludgy implementation is to abstract away the 'current
432 * file descriptor', which may also be a socket.
435 unsigned get_data_file_fd(char *fname, int cur_fd) {
439 struct statfs statfs;
442 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
443 * doesn't solve the problem */
445 cur_uptime = getuptime_minutes(&now);
447 if (cur_fd != START_DATA_FD) {
448 if (fstatfs(cur_fd, &statfs) == -1) {
449 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
452 if (min_free && (statfs.f_bavail < min_free)
453 && (cur_epoch==last_peak))
455 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
460 assume that we can reclaim space by overwriting our own files
461 and that the difference in size will not fill the disk - sapan
466 /* If epoch length has been exceeded,
467 * or we're starting up
468 * or we're going back to the first epoch */
469 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
471 prev_uptime = cur_uptime;
472 cur_epoch = (cur_epoch + 1) % log_epochs;
473 if (cur_epoch>last_peak) last_peak = cur_epoch;
476 /* Compress the finished file */
477 char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")];
478 snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
481 snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
482 if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)) < 0) {
483 my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
486 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
487 my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
489 update_cur_epoch_file(cur_epoch);
498 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
500 struct Flow **flowpp;
506 if (prev) flowpp = *prev;
509 if (where->sip.s_addr == what->sip.s_addr
510 && where->dip.s_addr == what->dip.s_addr
511 && where->proto == what->proto) {
512 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
514 /* Both unfragmented */
515 if ((what->sp == where->sp)
516 && (what->dp == where->dp)) goto done;
519 /* Both fragmented */
520 if (where->id == what->id) goto done;
524 flowpp = &where->next;
528 if (prev) *prev = flowpp;
532 int put_into(struct Flow *flow, int flag
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
540 struct Flow *flown, **flowpp;
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
546 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
547 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
550 pthread_mutex_lock(&flows_mutex[h]);
552 if (!(flown = find(flows[h], flow, &flowpp))) {
553 /* No suitable flow found - add */
554 if (flag == COPY_INTO) {
555 if ((flown = mem_alloc())) {
556 copy_flow(flow, flown);
559 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
560 my_log(LOG_ERR, "%s %s. %s",
561 "mem_alloc():", strerror(errno), "packet lost");
566 flow->next = flows[h];
568 #if ((DEBUG) & DEBUG_I)
570 if (flow->flags & FLOW_FRAG) flows_fragmented++;
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
574 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
579 /* Found suitable flow - update */
580 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
581 sprintf(buf, " +> %x", (unsigned) flown);
584 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
585 flown->mtime = flow->mtime;
586 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
587 flown->ctime = flow->ctime;
588 flown->tcp_flags |= flow->tcp_flags;
589 flown->size += flow->size;
590 flown->pkts += flow->pkts;
592 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
593 * if a better value comes along. A good example of this is that by the time CoDemux sets the
594 * peercred of a flow, it has already been accounted for here and attributed to root. */
596 if (flown->slice_id<1)
597 flown->slice_id = flow->slice_id;
600 if (flow->flags & FLOW_FRAG) {
601 /* Fragmented flow require some additional work */
602 if (flow->flags & FLOW_TL) {
605 Several packets with FLOW_TL (attack)
607 flown->sp = flow->sp;
608 flown->dp = flow->dp;
610 if (flow->flags & FLOW_LASTFRAG) {
613 Several packets with FLOW_LASTFRAG (attack)
615 flown->sizeP = flow->sizeP;
617 flown->flags |= flow->flags;
618 flown->sizeF += flow->sizeF;
619 if ((flown->flags & FLOW_LASTFRAG)
620 && (flown->sizeF >= flown->sizeP)) {
621 /* All fragments received - flow reassembled */
622 *flowpp = flown->next;
623 pthread_mutex_unlock(&flows_mutex[h]);
624 #if ((DEBUG) & DEBUG_I)
629 flown->flags &= ~FLOW_FRAG;
630 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
633 ret = put_into(flown, MOVE_INTO
634 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
640 if (flag == MOVE_INTO) mem_free(flow);
642 pthread_mutex_unlock(&flows_mutex[h]);
648 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
652 for (i = 0; i < fields; i++) {
653 #if ((DEBUG) & DEBUG_F)
654 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
657 case NETFLOW_IPV4_SRC_ADDR:
658 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
659 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
662 case NETFLOW_IPV4_DST_ADDR:
663 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
664 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
665 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
667 p += NETFLOW_IPV4_DST_ADDR_SIZE;
670 case NETFLOW_INPUT_SNMP:
671 *((uint16_t *) p) = htons(flow->iif);
672 p += NETFLOW_INPUT_SNMP_SIZE;
675 case NETFLOW_OUTPUT_SNMP:
676 *((uint16_t *) p) = htons(flow->oif);
677 p += NETFLOW_OUTPUT_SNMP_SIZE;
680 case NETFLOW_PKTS_32:
681 *((uint32_t *) p) = htonl(flow->pkts);
682 p += NETFLOW_PKTS_32_SIZE;
685 case NETFLOW_BYTES_32:
686 *((uint32_t *) p) = htonl(flow->size);
687 p += NETFLOW_BYTES_32_SIZE;
690 case NETFLOW_FIRST_SWITCHED:
691 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
692 p += NETFLOW_FIRST_SWITCHED_SIZE;
695 case NETFLOW_LAST_SWITCHED:
696 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
697 p += NETFLOW_LAST_SWITCHED_SIZE;
700 case NETFLOW_L4_SRC_PORT:
701 *((uint16_t *) p) = flow->sp;
702 p += NETFLOW_L4_SRC_PORT_SIZE;
705 case NETFLOW_L4_DST_PORT:
706 *((uint16_t *) p) = flow->dp;
707 p += NETFLOW_L4_DST_PORT_SIZE;
711 *((uint8_t *) p) = flow->proto;
712 p += NETFLOW_PROT_SIZE;
715 case NETFLOW_SRC_TOS:
716 *((uint8_t *) p) = flow->tos;
717 p += NETFLOW_SRC_TOS_SIZE;
720 case NETFLOW_TCP_FLAGS:
721 *((uint8_t *) p) = flow->tcp_flags;
722 p += NETFLOW_TCP_FLAGS_SIZE;
725 case NETFLOW_VERSION:
726 *((uint16_t *) p) = htons(netflow->Version);
727 p += NETFLOW_VERSION_SIZE;
731 *((uint16_t *) p) = htons(emit_count);
732 p += NETFLOW_COUNT_SIZE;
736 *((uint32_t *) p) = htonl(getuptime(&emit_time));
737 p += NETFLOW_UPTIME_SIZE;
740 case NETFLOW_UNIX_SECS:
741 *((uint32_t *) p) = htonl(emit_time.sec);
742 p += NETFLOW_UNIX_SECS_SIZE;
745 case NETFLOW_UNIX_NSECS:
746 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
747 p += NETFLOW_UNIX_NSECS_SIZE;
750 case NETFLOW_FLOW_SEQUENCE:
751 //*((uint32_t *) p) = htonl(emit_sequence);
752 *((uint32_t *) p) = 0;
753 p += NETFLOW_FLOW_SEQUENCE_SIZE;
757 /* Unsupported (uint8_t) */
758 case NETFLOW_ENGINE_TYPE:
759 case NETFLOW_ENGINE_ID:
760 case NETFLOW_FLAGS7_1:
761 case NETFLOW_SRC_MASK:
762 case NETFLOW_DST_MASK:
764 my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
767 *((uint8_t *) p) = 0;
768 p += NETFLOW_PAD8_SIZE;
770 case NETFLOW_SLICE_ID:
771 *((uint32_t *) p) = flow->slice_id;
772 p += NETFLOW_SLICE_ID_SIZE;
775 /* Unsupported (uint16_t) */
778 case NETFLOW_FLAGS7_2:
779 *((uint16_t *) p) = 0;
780 p += NETFLOW_PAD16_SIZE;
784 /* Unsupported (uint32_t) */
785 case NETFLOW_IPV4_NEXT_HOP:
786 case NETFLOW_ROUTER_SC:
787 *((uint32_t *) p) = 0;
788 p += NETFLOW_PAD32_SIZE;
792 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
793 format, i, format[i]);
797 #if ((DEBUG) & DEBUG_F)
798 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
805 Workaround for clone()-based threads
806 Try to change EUID independently of main thread
810 setregid(pw->pw_gid, pw->pw_gid);
811 setreuid(pw->pw_uid, pw->pw_uid);
820 struct timespec timeout;
821 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
823 p = (void *) &emit_packet + netflow->HeaderSize;
829 pthread_mutex_lock(&emit_mutex);
830 while (!flows_emit) {
831 gettimeofday(&now, 0);
832 timeout.tv_sec = now.tv_sec + emit_timeout;
833 /* Do not wait until emit_packet will filled - it may be too long */
835 while ((res=pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout))==-1) continue;
837 if (res && emit_count) {
838 //my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
839 pthread_mutex_unlock(&emit_mutex);
844 flows_emit = flows_emit->next;
845 #if ((DEBUG) & DEBUG_I)
848 pthread_mutex_unlock(&emit_mutex);
852 gettime(&start_time);
853 start_time.sec -= start_time_offset;
856 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
860 printf("Emit count = %d\n", emit_count);
863 if (emit_count == netflow->MaxFlows) {
866 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
867 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
868 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
869 #ifdef STD_NETFLOW_PDU
870 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
873 for (i = 0; i < npeers; i++) {
874 if (peers[i].type == PEER_FILE) {
875 if (netflow->SeqOffset)
876 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
877 peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
878 ret = write(peers[i].write_fd, emit_packet, size);
881 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
882 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
883 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
887 #if ((DEBUG) & DEBUG_E)
889 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
890 emit_count, i + 1, peers[i].seq);
893 peers[i].seq += emit_count;
896 if (emit_rate_bytes) {
898 delay = sent / emit_rate_bytes;
900 sent %= emit_rate_bytes;
902 timeout.tv_nsec = emit_rate_delay * delay;
903 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
908 if (peers[i].type == PEER_MIRROR) goto sendreal;
910 if (peers[i].type == PEER_ROTATE)
911 if (peer_rot_cur++ == peer_rot_work) {
913 if (netflow->SeqOffset)
914 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
915 ret = send(peers[i].write_fd, emit_packet, size, 0);
917 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
918 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
919 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
922 #if ((DEBUG) & DEBUG_E)
924 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
925 emit_count, i + 1, peers[i].seq);
928 peers[i].seq += emit_count;
931 if (emit_rate_bytes) {
933 delay = sent / emit_rate_bytes;
935 sent %= emit_rate_bytes;
937 timeout.tv_nsec = emit_rate_delay * delay;
938 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
943 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
944 emit_sequence += emit_count;
946 #if ((DEBUG) & DEBUG_I)
953 void *unpending_thread()
956 struct timespec timeout;
957 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
964 pthread_mutex_lock(&unpending_mutex);
967 while (!(pending_tail->flags & FLOW_PENDING)) {
968 gettimeofday(&now, 0);
969 timeout.tv_sec = now.tv_sec + unpending_timeout;
970 while (pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout)==-1)
974 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
977 if (put_into(pending_tail, COPY_INTO
978 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
982 #if ((DEBUG) & DEBUG_I)
983 pkts_lost_unpending++;
987 #if ((DEBUG) & DEBUG_U)
988 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
991 pending_tail->flags = 0;
992 pending_tail = pending_tail->next;
993 #if ((DEBUG) & DEBUG_I)
1001 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1005 struct Flow *flow, **flowpp;
1007 struct timespec timeout;
1011 timeout.tv_nsec = 0;
1012 pthread_mutex_lock(&scan_mutex);
1016 timeout.tv_sec = now.sec + scan_interval;
1017 while (pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout)==-1)
1021 #if ((DEBUG) & DEBUG_S)
1022 my_log(LOG_DEBUG, "S: %d", now.sec);
1024 for (i = 0; i < 1 << HASH_BITS ; i++) {
1025 pthread_mutex_lock(&flows_mutex[i]);
1029 if (flow->flags & FLOW_FRAG) {
1030 /* Process fragmented flow */
1031 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1032 /* Fragmented flow expired - put it into special chain */
1033 #if ((DEBUG) & DEBUG_I)
1037 *flowpp = flow->next;
1039 flow->flags &= ~FLOW_FRAG;
1040 flow->next = scan_frag_dreg;
1041 scan_frag_dreg = flow;
1046 /* Flow is not frgamented */
1047 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1048 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1050 #if ((DEBUG) & DEBUG_S)
1051 my_log(LOG_DEBUG, "S: E %x", flow);
1053 #if ((DEBUG) & DEBUG_I)
1056 *flowpp = flow->next;
1057 pthread_mutex_lock(&emit_mutex);
1058 flow->next = flows_emit;
1060 #if ((DEBUG) & DEBUG_I)
1063 pthread_mutex_unlock(&emit_mutex);
1068 flowpp = &flow->next;
1071 pthread_mutex_unlock(&flows_mutex[i]);
1073 if (flows_emit) pthread_cond_signal(&emit_cond);
1075 while (scan_frag_dreg) {
1076 flow = scan_frag_dreg;
1077 scan_frag_dreg = flow->next;
1078 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1081 put_into(flow, MOVE_INTO
1082 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1086 #if ((DEBUG) & DEBUG_S)
1087 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1095 struct ulog_packet_msg *ulog_msg;
1099 int len, off_frag, psize;
1100 #if ((DEBUG) & DEBUG_C)
1109 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1111 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1114 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1116 #if ((DEBUG) & DEBUG_C)
1117 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1120 nl = (void *) &ulog_msg->payload;
1121 psize = ulog_msg->data_len;
1124 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1125 #if ((DEBUG) & DEBUG_C)
1126 strcat(logbuf, " U");
1127 my_log(LOG_DEBUG, "%s", logbuf);
1129 #if ((DEBUG) & DEBUG_I)
1135 if (pending_head->flags) {
1136 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1138 # if ((DEBUG) & DEBUG_C)
1143 "pending queue full:", "packet lost");
1145 #if ((DEBUG) & DEBUG_I)
1146 pkts_lost_capture++;
1151 #if ((DEBUG) & DEBUG_I)
1155 flow = pending_head;
1157 /* ?FIXME? Add sanity check for ip_len? */
1158 flow->size = ntohs(nl->ip_len);
1159 #if ((DEBUG) & DEBUG_I)
1160 size_total += flow->size;
1163 flow->sip = nl->ip_src;
1164 flow->dip = nl->ip_dst;
1165 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1167 /* It's going to be expensive calling this syscall on every flow.
1168 * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1170 flow->slice_id = ulog_msg->mark;
1172 /*if (flow->slice_id < 1)
1173 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid*/
1176 if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1177 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1179 flow->iif = snmp_index(ulog_msg->indev_name);
1180 flow->oif = snmp_index(ulog_msg->outdev_name);
1181 flow->proto = nl->ip_p;
1183 flow->tcp_flags = 0;
1187 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1188 if (ulog_msg->timestamp_sec) {
1189 flow->ctime.sec = ulog_msg->timestamp_sec;
1190 flow->ctime.usec = ulog_msg->timestamp_usec;
1191 } else gettime(&flow->ctime);
1192 flow->mtime = flow->ctime;
1194 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1197 Offset (from network layer) to transport layer header/IP data
1198 IOW IP header size ;-)
1201 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1203 off_tl = nl->ip_hl << 2;
1204 tl = (void *) nl + off_tl;
1206 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1207 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1209 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1210 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1212 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1213 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1214 #if ((DEBUG) & DEBUG_C)
1215 strcat(logbuf, " F");
1217 #if ((DEBUG) & DEBUG_I)
1218 pkts_total_fragmented++;
1220 flow->flags |= FLOW_FRAG;
1221 flow->id = nl->ip_id;
1223 if (!(ntohs(nl->ip_off) & IP_MF)) {
1224 /* Packet whith IP_MF contains information about whole datagram size */
1225 flow->flags |= FLOW_LASTFRAG;
1226 /* size = frag_offset*8 + data_size */
1227 flow->sizeP = off_frag + flow->sizeF;
1231 #if ((DEBUG) & DEBUG_C)
1232 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1233 strcat(logbuf, buf);
1234 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1235 strcat(logbuf, buf);
1239 Fortunately most interesting transport layer information fit
1240 into first 8 bytes of IP data field (minimal nonzero size).
1241 Thus we don't need actual packet reassembling to build whole
1242 transport layer data. We only check the fragment offset for
1243 zero value to find packet with this information.
1245 if (!off_frag && psize >= 8) {
1246 switch (flow->proto) {
1249 flow->sp = ((struct udphdr *)tl)->uh_sport;
1250 flow->dp = ((struct udphdr *)tl)->uh_dport;
1255 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1256 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1259 #ifdef ICMP_TRICK_CISCO
1261 flow->dp = *((int32_t *) tl);
1266 /* Unknown transport layer */
1267 #if ((DEBUG) & DEBUG_C)
1268 strcat(logbuf, " U");
1275 #if ((DEBUG) & DEBUG_C)
1276 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1277 strcat(logbuf, buf);
1279 flow->flags |= FLOW_TL;
1283 /* Check for tcp flags presence (including CWR and ECE). */
1284 if (flow->proto == IPPROTO_TCP
1286 && psize >= 16 - off_frag) {
1287 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1288 #if ((DEBUG) & DEBUG_C)
1289 sprintf(buf, " TCP:%x", flow->tcp_flags);
1290 strcat(logbuf, buf);
1294 #if ((DEBUG) & DEBUG_C)
1295 sprintf(buf, " => %x", (unsigned) flow);
1296 strcat(logbuf, buf);
1297 my_log(LOG_DEBUG, "%s", logbuf);
1300 #if ((DEBUG) & DEBUG_I)
1302 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1303 if (pending_queue_trace < pending_queue_trace_candidate)
1304 pending_queue_trace = pending_queue_trace_candidate;
1307 /* Flow complete - inform unpending_thread() about it */
1308 pending_head->flags |= FLOW_PENDING;
1309 pending_head = pending_head->next;
1311 pthread_cond_signal(&unpending_cond);
1317 /* Copied out of CoDemux */
1319 static int init_daemon() {
1323 pidfile = fopen(PIDFILE, "w");
1324 if (pidfile == NULL) {
1325 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1328 if ((pid = fork()) < 0) {
1330 my_log(LOG_ERR, "Could not fork!\n");
1333 else if (pid != 0) {
1334 /* i'm the parent, writing down the child pid */
1335 fprintf(pidfile, "%u\n", pid);
1340 /* close the pid file */
1343 /* routines for any daemon process
1344 1. create a new session
1345 2. change directory to the root
1346 3. change the file creation permission
1349 chdir("/var/local/fprobe");
1355 int main(int argc, char **argv)
1358 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1359 int c, i, write_fd, memory_limit = 0;
1360 struct addrinfo hints, *res;
1361 struct sockaddr_in saddr;
1362 pthread_attr_t tattr;
1363 struct sigaction sigact;
1364 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1365 struct timeval timeout;
1367 sched_min = sched_get_priority_min(SCHED);
1368 sched_max = sched_get_priority_max(SCHED);
1370 memset(&saddr, 0 , sizeof(saddr));
1371 memset(&hints, 0 , sizeof(hints));
1372 hints.ai_flags = AI_PASSIVE;
1373 hints.ai_family = AF_INET;
1374 hints.ai_socktype = SOCK_DGRAM;
1376 /* Process command line options */
1379 while ((c = my_getopt(argc, argv, parms)) != -1) {
1389 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1390 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1391 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1392 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1393 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1394 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1395 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1396 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1397 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1398 if (parms[nflag].count) {
1399 switch (atoi(parms[nflag].arg)) {
1401 netflow = &NetFlow1;
1408 netflow = &NetFlow7;
1412 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1416 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1417 if (parms[lflag].count) {
1418 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1421 sprintf(errpbuf, "[%s]", log_suffix);
1422 strcat(ident, errpbuf);
1425 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1426 if (log_suffix) *--log_suffix = ':';
1428 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1430 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1433 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1434 if (parms[qflag].count) {
1435 pending_queue_length = atoi(parms[qflag].arg);
1436 if (pending_queue_length < 1) {
1437 fprintf(stderr, "Illegal %s\n", "pending queue length");
1441 if (parms[rflag].count) {
1442 schedp.sched_priority = atoi(parms[rflag].arg);
1443 if (schedp.sched_priority
1444 && (schedp.sched_priority < sched_min
1445 || schedp.sched_priority > sched_max)) {
1446 fprintf(stderr, "Illegal %s\n", "realtime priority");
1450 if (parms[Bflag].count) {
1451 sockbufsize = atoi(parms[Bflag].arg) << 10;
1453 if (parms[bflag].count) {
1454 bulk_quantity = atoi(parms[bflag].arg);
1455 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1456 fprintf(stderr, "Illegal %s\n", "bulk size");
1460 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1461 if (parms[Xflag].count) {
1462 for(i = 0; parms[Xflag].arg[i]; i++)
1463 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1464 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1466 rule = strtok(parms[Xflag].arg, ":");
1467 for (i = 0; rule; i++) {
1468 snmp_rules[i].len = strlen(rule);
1469 if (snmp_rules[i].len > IFNAMSIZ) {
1470 fprintf(stderr, "Illegal %s\n", "interface basename");
1473 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1474 if (!*(rule - 1)) *(rule - 1) = ',';
1475 rule = strtok(NULL, ",");
1477 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1480 snmp_rules[i].base = atoi(rule);
1482 rule = strtok(NULL, ":");
1486 if (parms[tflag].count)
1487 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1488 if (parms[aflag].count) {
1489 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1491 fprintf(stderr, "Illegal %s\n", "source address");
1494 saddr = *((struct sockaddr_in *) res->ai_addr);
1498 if (parms[uflag].count)
1499 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1500 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1505 /* Process collectors parameters. Brrrr... :-[ */
1507 npeers = argc - optind;
1509 /* Send to remote Netflow collector */
1510 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1511 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1513 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1515 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1516 fprintf(stderr, "socket(): %s\n", strerror(errno));
1519 peers[npeers].write_fd = write_fd;
1520 peers[npeers].type = PEER_MIRROR;
1521 peers[npeers].laddr = saddr;
1522 peers[npeers].seq = 0;
1523 if ((lhost = strchr(dport, '/'))) {
1525 if ((type = strchr(lhost, '/'))) {
1533 peers[npeers].type = PEER_ROTATE;
1542 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1543 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1547 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1548 sizeof(struct sockaddr_in))) {
1549 fprintf(stderr, "bind(): %s\n", strerror(errno));
1552 if (getaddrinfo(dhost, dport, &hints, &res)) {
1554 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1557 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1559 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1560 sizeof(struct sockaddr_in))) {
1561 fprintf(stderr, "connect(): %s\n", strerror(errno));
1565 /* Restore command line */
1566 if (type) *--type = '/';
1567 if (lhost) *--lhost = '/';
1571 else if (parms[fflag].count) {
1573 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1574 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1575 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1577 peers[npeers].write_fd = START_DATA_FD;
1578 peers[npeers].type = PEER_FILE;
1579 peers[npeers].seq = 0;
1588 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1589 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1591 fprintf(stderr, "libipulog initialization error: %s",
1592 ipulog_strerror(ipulog_errno));
1596 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1597 &sockbufsize, sizeof(sockbufsize)) < 0)
1598 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1600 /* Daemonize (if log destination stdout-free) */
1602 my_log_open(ident, verbosity, log_dest);
1606 if (!(log_dest & 2)) {
1607 /* Crash-proofing - Sapan*/
1611 fprintf(stderr, "fork(): %s", strerror(errno));
1616 freopen("/dev/null", "r", stdin);
1617 freopen("/dev/null", "w", stdout);
1618 freopen("/dev/null", "w", stderr);
1622 while (wait3(NULL,0,NULL) < 1);
1626 setvbuf(stdout, (char *)0, _IONBF, 0);
1627 setvbuf(stderr, (char *)0, _IONBF, 0);
1631 sprintf(errpbuf, "[%ld]", (long) pid);
1632 strcat(ident, errpbuf);
1634 /* Initialization */
1636 // init_slice_id_hash();
1637 hash_init(); /* Actually for crc16 only */
1638 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1639 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1642 /* Hope 12 days is enough :-/ */
1643 start_time_offset = 1 << 20;
1645 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1647 gettime(&start_time);
1650 Build static pending queue as circular buffer.
1652 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1653 pending_tail = pending_head;
1654 for (i = pending_queue_length - 1; i--;) {
1655 if (!(pending_tail->next = mem_alloc())) {
1657 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1660 pending_tail = pending_tail->next;
1662 pending_tail->next = pending_head;
1663 pending_tail = pending_head;
1665 sigemptyset(&sig_mask);
1666 sigact.sa_handler = &sighandler;
1667 sigact.sa_mask = sig_mask;
1668 sigact.sa_flags = 0;
1669 sigaddset(&sig_mask, SIGTERM);
1670 sigaction(SIGTERM, &sigact, 0);
1671 #if ((DEBUG) & DEBUG_I)
1672 sigaddset(&sig_mask, SIGUSR1);
1673 sigaction(SIGUSR1, &sigact, 0);
1675 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1676 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1680 my_log(LOG_INFO, "Starting %s...", VERSION);
1682 if (parms[cflag].count) {
1683 if (chdir(parms[cflag].arg) || chroot(".")) {
1684 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1689 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1690 pthread_attr_init(&tattr);
1691 for (i = 0; i < THREADS - 1; i++) {
1692 if (schedp.sched_priority > 0) {
1693 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1694 (pthread_attr_setschedparam(&tattr, &schedp))) {
1695 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1699 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1700 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1703 pthread_detach(thid);
1704 schedp.sched_priority++;
1708 if (setgroups(0, NULL)) {
1709 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1712 if (setregid(pw->pw_gid, pw->pw_gid)) {
1713 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1716 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1717 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1722 if (!(pidfile = fopen(pidfilepath, "w")))
1723 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1725 fprintf(pidfile, "%ld\n", (long) pid);
1729 my_log(LOG_INFO, "pid: %d", pid);
1730 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1731 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1732 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1733 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1734 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1735 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1736 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1737 for (i = 0; i < nsnmp_rules; i++) {
1738 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1739 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1741 for (i = 0; i < npeers; i++) {
1742 switch (peers[i].type) {
1750 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1751 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1752 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1755 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1757 timeout.tv_usec = 0;
1759 || (total_elements - free_elements - pending_queue_length)
1761 || pending_tail->flags) {
1764 timeout.tv_sec = scan_interval;
1765 select(0, 0, 0, 0, &timeout);
1768 if (sigs & SIGTERM_MASK && !killed) {
1769 sigs &= ~SIGTERM_MASK;
1770 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1773 active_lifetime = -1;
1774 inactive_lifetime = -1;
1776 unpending_timeout = 1;
1778 pthread_cond_signal(&scan_cond);
1779 pthread_cond_signal(&unpending_cond);
1782 #if ((DEBUG) & DEBUG_I)
1783 if (sigs & SIGUSR1_MASK) {
1784 sigs &= ~SIGUSR1_MASK;
1789 remove(pidfilepath);
1790 #if ((DEBUG) & DEBUG_I)
1793 my_log(LOG_INFO, "Done.");