2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
42 struct ipulog_handle {
45 struct sockaddr_nl local;
46 struct sockaddr_nl peer;
47 struct nlmsghdr* last_nlhdr;
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
62 #include <sys/param.h>
87 #include <sys/select.h>
93 #include <fprobe-ulog.h>
95 #include <my_getopt.h>
100 #define PIDFILE "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE sizeof("32767")
103 #define STD_NETFLOW_PDU
133 static struct getopt_parms parms[] = {
134 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
162 extern int optind, opterr, optopt;
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
181 #define BULK_QUANTITY 200
184 static unsigned epoch_length=60, log_epochs=1;
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
220 static char cur_output_file[MAX_PATH_LEN];
222 static struct Flow *flows[1 << HASH_BITS];
223 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
225 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
228 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *pending_head, *pending_tail;
231 static struct Flow *scan_frag_dreg;
233 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
235 static struct Flow *flows_emit;
237 static char ident[256] = "fprobe-ulog";
238 static FILE *pidfile;
239 static char *pidfilepath;
242 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
243 static struct ipulog_handle *ulog_handle;
244 static uint32_t ulog_gmask = 1;
245 static char *cap_buf;
246 static int nsnmp_rules;
247 static struct snmp_rule *snmp_rules;
248 static struct passwd *pw = 0;
253 "fprobe-ulog: a NetFlow probe. Version %s\n"
254 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
256 "-h\t\tDisplay this help\n"
257 "-U <mask>\tULOG group bitwise mask [1]\n"
258 "-s <seconds>\tHow often scan for expired flows [5]\n"
259 "-g <seconds>\tFragmented flow lifetime [30]\n"
260 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
261 "-f <filename>\tLog flow data in a file\n"
262 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
263 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
264 "-a <address>\tUse <address> as source for NetFlow flow\n"
265 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
266 "-M\t\tUse netfilter mark value as ToS flag\n"
267 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
268 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
269 "-q <flows>\tPending queue length [100]\n"
270 "-B <kilobytes>\tKernel capture buffer size [0]\n"
271 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
272 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
273 "-c <directory>\tDirectory to chroot to\n"
274 "-u <user>\tUser to run as\n"
275 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
276 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
277 "-y <remote:port>\tAddress of the NetFlow collector\n"
278 "-f <writable file>\tFile to write data into\n"
279 "-T <n>\tRotate log file every n epochs\n"
280 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
281 "-E <[1..60]>\tSize of an epoch in minutes\n"
282 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
284 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
288 #if ((DEBUG) & DEBUG_I)
291 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
292 pkts_total, pkts_total_fragmented, size_total,
293 pkts_pending - pkts_pending_done, pending_queue_trace);
294 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
295 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
296 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
297 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
298 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
299 total_elements, free_elements, total_memory);
303 void sighandler(int sig)
307 sigs |= SIGTERM_MASK;
309 #if ((DEBUG) & DEBUG_I)
311 sigs |= SIGUSR1_MASK;
317 void gettime(struct Time *now)
323 now->usec = t.tv_usec;
327 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
329 return (t1->sec - t2->sec)/60;
332 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
334 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
337 /* Uptime in miliseconds */
338 uint32_t getuptime(struct Time *t)
340 /* Maximum uptime is about 49/2 days */
341 return cmpmtime(t, &start_time);
344 /* Uptime in minutes */
345 uint32_t getuptime_minutes(struct Time *t)
347 /* Maximum uptime is about 49/2 days */
348 return cmpMtime(t, &start_time);
351 hash_t hash_flow(struct Flow *flow)
353 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
354 else return hash(flow, sizeof(struct Flow_TL));
357 uint16_t snmp_index(char *name) {
360 if (!*name) return 0;
362 for (i = 0; (int) i < nsnmp_rules; i++) {
363 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
364 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
367 if ((i = if_nametoindex(name))) return i;
372 inline void copy_flow(struct Flow *src, struct Flow *dst)
379 dst->slice_id = src->slice_id;
380 dst->proto = src->proto;
381 dst->tcp_flags = src->tcp_flags;
385 dst->pkts = src->pkts;
386 dst->size = src->size;
387 dst->sizeF = src->sizeF;
388 dst->sizeP = src->sizeP;
389 dst->ctime = src->ctime;
390 dst->mtime = src->mtime;
391 dst->flags = src->flags;
394 void read_cur_epoch() {
396 /* Reset to -1 in case the read fails */
398 fd = open(LAST_EPOCH_FILE, O_RDONLY);
400 char snum[MAX_EPOCH_SIZE];
402 len = read(fd, snum, MAX_EPOCH_SIZE-1);
405 sscanf(snum,"%d",&cur_epoch);
406 cur_epoch++; /* Let's not stone the last epoch */
414 /* Dumps the current epoch in a file to cope with
415 * reboots and killings of fprobe */
417 void update_cur_epoch_file(int n) {
419 char snum[MAX_EPOCH_SIZE];
420 len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
421 fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
423 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
426 write(fd, snum, len);
430 /* Get the file descriptor corresponding to the current file.
431 * The kludgy implementation is to abstract away the 'current
432 * file descriptor', which may also be a socket.
435 unsigned get_data_file_fd(char *fname, int cur_fd) {
439 struct statfs statfs;
442 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
443 * doesn't solve the problem */
445 cur_uptime = getuptime_minutes(&now);
447 if (cur_fd != START_DATA_FD) {
448 if (fstatfs(cur_fd, &statfs) == -1) {
449 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
452 if (min_free && (statfs.f_bavail < min_free)
453 && (cur_epoch==last_peak))
455 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
460 assume that we can reclaim space by overwriting our own files
461 and that the difference in size will not fill the disk - sapan
466 /* If epoch length has been exceeded,
467 * or we're starting up
468 * or we're going back to the first epoch */
469 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
471 prev_uptime = cur_uptime;
472 cur_epoch = (cur_epoch + 1) % log_epochs;
473 if (cur_epoch>last_peak) last_peak = cur_epoch;
476 /* Compress the finished file */
477 char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")];
478 snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
481 snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
482 if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
483 my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
486 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
487 my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
489 update_cur_epoch_file(cur_epoch);
498 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
500 struct Flow **flowpp;
506 if (prev) flowpp = *prev;
509 if (where->sip.s_addr == what->sip.s_addr
510 && where->dip.s_addr == what->dip.s_addr
511 && where->proto == what->proto) {
512 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
514 /* Both unfragmented */
515 if ((what->sp == where->sp)
516 && (what->dp == where->dp)) goto done;
519 /* Both fragmented */
520 if (where->id == what->id) goto done;
524 flowpp = &where->next;
528 if (prev) *prev = flowpp;
532 int put_into(struct Flow *flow, int flag
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
540 struct Flow *flown, **flowpp;
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
546 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
547 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
550 pthread_mutex_lock(&flows_mutex[h]);
552 if (!(flown = find(flows[h], flow, &flowpp))) {
553 /* No suitable flow found - add */
554 if (flag == COPY_INTO) {
555 if ((flown = mem_alloc())) {
556 copy_flow(flow, flown);
559 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
560 my_log(LOG_ERR, "%s %s. %s",
561 "mem_alloc():", strerror(errno), "packet lost");
566 flow->next = flows[h];
568 #if ((DEBUG) & DEBUG_I)
570 if (flow->flags & FLOW_FRAG) flows_fragmented++;
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
574 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
579 /* Found suitable flow - update */
580 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
581 sprintf(buf, " +> %x", (unsigned) flown);
584 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
585 flown->mtime = flow->mtime;
586 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
587 flown->ctime = flow->ctime;
588 flown->tcp_flags |= flow->tcp_flags;
589 flown->size += flow->size;
590 flown->pkts += flow->pkts;
592 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
593 * if a better value comes along. A good example of this is that by the time CoDemux sets the
594 * peercred of a flow, it has already been accounted for here and attributed to root. */
596 if (flown->slice_id<1)
597 flown->slice_id = flow->slice_id;
600 if (flow->flags & FLOW_FRAG) {
601 /* Fragmented flow require some additional work */
602 if (flow->flags & FLOW_TL) {
605 Several packets with FLOW_TL (attack)
607 flown->sp = flow->sp;
608 flown->dp = flow->dp;
610 if (flow->flags & FLOW_LASTFRAG) {
613 Several packets with FLOW_LASTFRAG (attack)
615 flown->sizeP = flow->sizeP;
617 flown->flags |= flow->flags;
618 flown->sizeF += flow->sizeF;
619 if ((flown->flags & FLOW_LASTFRAG)
620 && (flown->sizeF >= flown->sizeP)) {
621 /* All fragments received - flow reassembled */
622 *flowpp = flown->next;
623 pthread_mutex_unlock(&flows_mutex[h]);
624 #if ((DEBUG) & DEBUG_I)
629 flown->flags &= ~FLOW_FRAG;
630 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
633 ret = put_into(flown, MOVE_INTO
634 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
640 if (flag == MOVE_INTO) mem_free(flow);
642 pthread_mutex_unlock(&flows_mutex[h]);
648 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
652 for (i = 0; i < fields; i++) {
653 #if ((DEBUG) & DEBUG_F)
654 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
657 case NETFLOW_IPV4_SRC_ADDR:
658 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
659 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
662 case NETFLOW_IPV4_DST_ADDR:
663 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
664 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
665 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
667 p += NETFLOW_IPV4_DST_ADDR_SIZE;
670 case NETFLOW_INPUT_SNMP:
671 *((uint16_t *) p) = htons(flow->iif);
672 p += NETFLOW_INPUT_SNMP_SIZE;
675 case NETFLOW_OUTPUT_SNMP:
676 *((uint16_t *) p) = htons(flow->oif);
677 p += NETFLOW_OUTPUT_SNMP_SIZE;
680 case NETFLOW_PKTS_32:
681 *((uint32_t *) p) = htonl(flow->pkts);
682 p += NETFLOW_PKTS_32_SIZE;
685 case NETFLOW_BYTES_32:
686 *((uint32_t *) p) = htonl(flow->size);
687 p += NETFLOW_BYTES_32_SIZE;
690 case NETFLOW_FIRST_SWITCHED:
691 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
692 p += NETFLOW_FIRST_SWITCHED_SIZE;
695 case NETFLOW_LAST_SWITCHED:
696 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
697 p += NETFLOW_LAST_SWITCHED_SIZE;
700 case NETFLOW_L4_SRC_PORT:
701 *((uint16_t *) p) = flow->sp;
702 p += NETFLOW_L4_SRC_PORT_SIZE;
705 case NETFLOW_L4_DST_PORT:
706 *((uint16_t *) p) = flow->dp;
707 p += NETFLOW_L4_DST_PORT_SIZE;
711 *((uint8_t *) p) = flow->proto;
712 p += NETFLOW_PROT_SIZE;
715 case NETFLOW_SRC_TOS:
716 *((uint8_t *) p) = flow->tos;
717 p += NETFLOW_SRC_TOS_SIZE;
720 case NETFLOW_TCP_FLAGS:
721 *((uint8_t *) p) = flow->tcp_flags;
722 p += NETFLOW_TCP_FLAGS_SIZE;
725 case NETFLOW_VERSION:
726 *((uint16_t *) p) = htons(netflow->Version);
727 p += NETFLOW_VERSION_SIZE;
731 *((uint16_t *) p) = htons(emit_count);
732 p += NETFLOW_COUNT_SIZE;
736 *((uint32_t *) p) = htonl(getuptime(&emit_time));
737 p += NETFLOW_UPTIME_SIZE;
740 case NETFLOW_UNIX_SECS:
741 *((uint32_t *) p) = htonl(emit_time.sec);
742 p += NETFLOW_UNIX_SECS_SIZE;
745 case NETFLOW_UNIX_NSECS:
746 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
747 p += NETFLOW_UNIX_NSECS_SIZE;
750 case NETFLOW_FLOW_SEQUENCE:
751 //*((uint32_t *) p) = htonl(emit_sequence);
752 *((uint32_t *) p) = 0;
753 p += NETFLOW_FLOW_SEQUENCE_SIZE;
757 /* Unsupported (uint8_t) */
758 case NETFLOW_ENGINE_TYPE:
759 case NETFLOW_ENGINE_ID:
760 case NETFLOW_FLAGS7_1:
761 case NETFLOW_SRC_MASK:
762 case NETFLOW_DST_MASK:
764 my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
767 *((uint8_t *) p) = 0;
768 p += NETFLOW_PAD8_SIZE;
770 case NETFLOW_SLICE_ID:
771 *((uint32_t *) p) = flow->slice_id;
772 p += NETFLOW_SLICE_ID_SIZE;
775 /* Unsupported (uint16_t) */
778 case NETFLOW_FLAGS7_2:
779 *((uint16_t *) p) = 0;
780 p += NETFLOW_PAD16_SIZE;
784 /* Unsupported (uint32_t) */
785 case NETFLOW_IPV4_NEXT_HOP:
786 case NETFLOW_ROUTER_SC:
787 *((uint32_t *) p) = 0;
788 p += NETFLOW_PAD32_SIZE;
792 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
793 format, i, format[i]);
797 #if ((DEBUG) & DEBUG_F)
798 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
805 Workaround for clone()-based threads
806 Try to change EUID independently of main thread
810 setregid(pw->pw_gid, pw->pw_gid);
811 setreuid(pw->pw_uid, pw->pw_uid);
820 struct timespec timeout;
821 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
823 p = (void *) &emit_packet + netflow->HeaderSize;
828 //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT);
831 pthread_mutex_lock(&emit_mutex);
832 while (!flows_emit) {
833 gettimeofday(&now, 0);
834 timeout.tv_sec = now.tv_sec + emit_timeout;
835 /* Do not wait until emit_packet will filled - it may be too long */
836 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
837 my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
838 pthread_mutex_unlock(&emit_mutex);
843 flows_emit = flows_emit->next;
844 #if ((DEBUG) & DEBUG_I)
847 pthread_mutex_unlock(&emit_mutex);
851 gettime(&start_time);
852 start_time.sec -= start_time_offset;
855 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
859 printf("Emit count = %d\n", emit_count);
862 if (emit_count == netflow->MaxFlows) {
865 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
866 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
867 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
868 #ifdef STD_NETFLOW_PDU
869 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
872 for (i = 0; i < npeers; i++) {
873 if (peers[i].type == PEER_FILE) {
874 if (netflow->SeqOffset)
875 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
876 peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
877 ret = write(peers[i].write_fd, emit_packet, size);
880 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
881 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
882 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
886 #if ((DEBUG) & DEBUG_E)
888 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
889 emit_count, i + 1, peers[i].seq);
892 peers[i].seq += emit_count;
895 if (emit_rate_bytes) {
897 delay = sent / emit_rate_bytes;
899 sent %= emit_rate_bytes;
901 timeout.tv_nsec = emit_rate_delay * delay;
902 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
907 if (peers[i].type == PEER_MIRROR) goto sendreal;
909 if (peers[i].type == PEER_ROTATE)
910 if (peer_rot_cur++ == peer_rot_work) {
912 if (netflow->SeqOffset)
913 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
914 ret = send(peers[i].write_fd, emit_packet, size, 0);
916 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
917 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
918 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
921 #if ((DEBUG) & DEBUG_E)
923 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
924 emit_count, i + 1, peers[i].seq);
927 peers[i].seq += emit_count;
930 if (emit_rate_bytes) {
932 delay = sent / emit_rate_bytes;
934 sent %= emit_rate_bytes;
936 timeout.tv_nsec = emit_rate_delay * delay;
937 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
942 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
943 emit_sequence += emit_count;
945 #if ((DEBUG) & DEBUG_I)
952 void *unpending_thread()
955 struct timespec timeout;
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
963 pthread_mutex_lock(&unpending_mutex);
966 while (!(pending_tail->flags & FLOW_PENDING)) {
967 gettimeofday(&now, 0);
968 timeout.tv_sec = now.tv_sec + unpending_timeout;
969 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
972 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
975 if (put_into(pending_tail, COPY_INTO
976 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
980 #if ((DEBUG) & DEBUG_I)
981 pkts_lost_unpending++;
985 #if ((DEBUG) & DEBUG_U)
986 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
989 pending_tail->flags = 0;
990 pending_tail = pending_tail->next;
991 #if ((DEBUG) & DEBUG_I)
999 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1003 struct Flow *flow, **flowpp;
1005 struct timespec timeout;
1009 timeout.tv_nsec = 0;
1010 pthread_mutex_lock(&scan_mutex);
1014 timeout.tv_sec = now.sec + scan_interval;
1015 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1018 #if ((DEBUG) & DEBUG_S)
1019 my_log(LOG_DEBUG, "S: %d", now.sec);
1021 for (i = 0; i < 1 << HASH_BITS ; i++) {
1022 pthread_mutex_lock(&flows_mutex[i]);
1026 if (flow->flags & FLOW_FRAG) {
1027 /* Process fragmented flow */
1028 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1029 /* Fragmented flow expired - put it into special chain */
1030 #if ((DEBUG) & DEBUG_I)
1034 *flowpp = flow->next;
1036 flow->flags &= ~FLOW_FRAG;
1037 flow->next = scan_frag_dreg;
1038 scan_frag_dreg = flow;
1043 /* Flow is not frgamented */
1044 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1045 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1047 #if ((DEBUG) & DEBUG_S)
1048 my_log(LOG_DEBUG, "S: E %x", flow);
1050 #if ((DEBUG) & DEBUG_I)
1053 *flowpp = flow->next;
1054 pthread_mutex_lock(&emit_mutex);
1055 flow->next = flows_emit;
1057 #if ((DEBUG) & DEBUG_I)
1060 pthread_mutex_unlock(&emit_mutex);
1065 flowpp = &flow->next;
1068 pthread_mutex_unlock(&flows_mutex[i]);
1070 if (flows_emit) pthread_cond_signal(&emit_cond);
1072 while (scan_frag_dreg) {
1073 flow = scan_frag_dreg;
1074 scan_frag_dreg = flow->next;
1075 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1078 put_into(flow, MOVE_INTO
1079 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1083 #if ((DEBUG) & DEBUG_S)
1084 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1092 struct ulog_packet_msg *ulog_msg;
1096 int len, off_frag, psize;
1097 #if ((DEBUG) & DEBUG_C)
1106 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1108 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1111 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1113 #if ((DEBUG) & DEBUG_C)
1114 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1117 nl = (void *) &ulog_msg->payload;
1118 psize = ulog_msg->data_len;
1121 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1122 #if ((DEBUG) & DEBUG_C)
1123 strcat(logbuf, " U");
1124 my_log(LOG_DEBUG, "%s", logbuf);
1126 #if ((DEBUG) & DEBUG_I)
1132 if (pending_head->flags) {
1133 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1135 # if ((DEBUG) & DEBUG_C)
1140 "pending queue full:", "packet lost");
1142 #if ((DEBUG) & DEBUG_I)
1143 pkts_lost_capture++;
1148 #if ((DEBUG) & DEBUG_I)
1152 flow = pending_head;
1154 /* ?FIXME? Add sanity check for ip_len? */
1155 flow->size = ntohs(nl->ip_len);
1156 #if ((DEBUG) & DEBUG_I)
1157 size_total += flow->size;
1160 flow->sip = nl->ip_src;
1161 flow->dip = nl->ip_dst;
1162 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1164 /* It's going to be expensive calling this syscall on every flow.
1165 * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1169 if (ulog_msg->mark > 0) {
1170 flow->slice_id = xid_to_slice_id(ulog_msg->mark);
1173 if (flow->slice_id < 1)
1174 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
1177 if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1178 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1180 flow->iif = snmp_index(ulog_msg->indev_name);
1181 flow->oif = snmp_index(ulog_msg->outdev_name);
1182 flow->proto = nl->ip_p;
1184 flow->tcp_flags = 0;
1188 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1189 if (ulog_msg->timestamp_sec) {
1190 flow->ctime.sec = ulog_msg->timestamp_sec;
1191 flow->ctime.usec = ulog_msg->timestamp_usec;
1192 } else gettime(&flow->ctime);
1193 flow->mtime = flow->ctime;
1195 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1198 Offset (from network layer) to transport layer header/IP data
1199 IOW IP header size ;-)
1202 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1204 off_tl = nl->ip_hl << 2;
1205 tl = (void *) nl + off_tl;
1207 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1208 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1210 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1211 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1213 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1214 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1215 #if ((DEBUG) & DEBUG_C)
1216 strcat(logbuf, " F");
1218 #if ((DEBUG) & DEBUG_I)
1219 pkts_total_fragmented++;
1221 flow->flags |= FLOW_FRAG;
1222 flow->id = nl->ip_id;
1224 if (!(ntohs(nl->ip_off) & IP_MF)) {
1225 /* Packet whith IP_MF contains information about whole datagram size */
1226 flow->flags |= FLOW_LASTFRAG;
1227 /* size = frag_offset*8 + data_size */
1228 flow->sizeP = off_frag + flow->sizeF;
1232 #if ((DEBUG) & DEBUG_C)
1233 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1234 strcat(logbuf, buf);
1235 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1236 strcat(logbuf, buf);
1240 Fortunately most interesting transport layer information fit
1241 into first 8 bytes of IP data field (minimal nonzero size).
1242 Thus we don't need actual packet reassembling to build whole
1243 transport layer data. We only check the fragment offset for
1244 zero value to find packet with this information.
1246 if (!off_frag && psize >= 8) {
1247 switch (flow->proto) {
1250 flow->sp = ((struct udphdr *)tl)->uh_sport;
1251 flow->dp = ((struct udphdr *)tl)->uh_dport;
1256 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1257 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1260 #ifdef ICMP_TRICK_CISCO
1262 flow->dp = *((int32_t *) tl);
1267 /* Unknown transport layer */
1268 #if ((DEBUG) & DEBUG_C)
1269 strcat(logbuf, " U");
1276 #if ((DEBUG) & DEBUG_C)
1277 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1278 strcat(logbuf, buf);
1280 flow->flags |= FLOW_TL;
1284 /* Check for tcp flags presence (including CWR and ECE). */
1285 if (flow->proto == IPPROTO_TCP
1287 && psize >= 16 - off_frag) {
1288 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1289 #if ((DEBUG) & DEBUG_C)
1290 sprintf(buf, " TCP:%x", flow->tcp_flags);
1291 strcat(logbuf, buf);
1295 #if ((DEBUG) & DEBUG_C)
1296 sprintf(buf, " => %x", (unsigned) flow);
1297 strcat(logbuf, buf);
1298 my_log(LOG_DEBUG, "%s", logbuf);
1301 #if ((DEBUG) & DEBUG_I)
1303 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1304 if (pending_queue_trace < pending_queue_trace_candidate)
1305 pending_queue_trace = pending_queue_trace_candidate;
1308 /* Flow complete - inform unpending_thread() about it */
1309 pending_head->flags |= FLOW_PENDING;
1310 pending_head = pending_head->next;
1312 pthread_cond_signal(&unpending_cond);
1318 /* Copied out of CoDemux */
1320 static int init_daemon() {
1324 pidfile = fopen(PIDFILE, "w");
1325 if (pidfile == NULL) {
1326 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1329 if ((pid = fork()) < 0) {
1331 my_log(LOG_ERR, "Could not fork!\n");
1334 else if (pid != 0) {
1335 /* i'm the parent, writing down the child pid */
1336 fprintf(pidfile, "%u\n", pid);
1341 /* close the pid file */
1344 /* routines for any daemon process
1345 1. create a new session
1346 2. change directory to the root
1347 3. change the file creation permission
1350 chdir("/var/local/fprobe");
1356 int main(int argc, char **argv)
1359 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1360 int c, i, write_fd, memory_limit = 0;
1361 struct addrinfo hints, *res;
1362 struct sockaddr_in saddr;
1363 pthread_attr_t tattr;
1364 struct sigaction sigact;
1365 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1366 struct timeval timeout;
1368 sched_min = sched_get_priority_min(SCHED);
1369 sched_max = sched_get_priority_max(SCHED);
1371 memset(&saddr, 0 , sizeof(saddr));
1372 memset(&hints, 0 , sizeof(hints));
1373 hints.ai_flags = AI_PASSIVE;
1374 hints.ai_family = AF_INET;
1375 hints.ai_socktype = SOCK_DGRAM;
1377 /* Process command line options */
1380 while ((c = my_getopt(argc, argv, parms)) != -1) {
1390 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1391 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1392 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1393 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1394 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1395 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1396 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1397 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1398 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1399 if (parms[nflag].count) {
1400 switch (atoi(parms[nflag].arg)) {
1402 netflow = &NetFlow1;
1409 netflow = &NetFlow7;
1413 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1417 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1418 if (parms[lflag].count) {
1419 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1422 sprintf(errpbuf, "[%s]", log_suffix);
1423 strcat(ident, errpbuf);
1426 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1427 if (log_suffix) *--log_suffix = ':';
1429 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1431 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1434 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1435 if (parms[qflag].count) {
1436 pending_queue_length = atoi(parms[qflag].arg);
1437 if (pending_queue_length < 1) {
1438 fprintf(stderr, "Illegal %s\n", "pending queue length");
1442 if (parms[rflag].count) {
1443 schedp.sched_priority = atoi(parms[rflag].arg);
1444 if (schedp.sched_priority
1445 && (schedp.sched_priority < sched_min
1446 || schedp.sched_priority > sched_max)) {
1447 fprintf(stderr, "Illegal %s\n", "realtime priority");
1451 if (parms[Bflag].count) {
1452 sockbufsize = atoi(parms[Bflag].arg) << 10;
1454 if (parms[bflag].count) {
1455 bulk_quantity = atoi(parms[bflag].arg);
1456 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1457 fprintf(stderr, "Illegal %s\n", "bulk size");
1461 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1462 if (parms[Xflag].count) {
1463 for(i = 0; parms[Xflag].arg[i]; i++)
1464 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1465 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1467 rule = strtok(parms[Xflag].arg, ":");
1468 for (i = 0; rule; i++) {
1469 snmp_rules[i].len = strlen(rule);
1470 if (snmp_rules[i].len > IFNAMSIZ) {
1471 fprintf(stderr, "Illegal %s\n", "interface basename");
1474 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1475 if (!*(rule - 1)) *(rule - 1) = ',';
1476 rule = strtok(NULL, ",");
1478 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1481 snmp_rules[i].base = atoi(rule);
1483 rule = strtok(NULL, ":");
1487 if (parms[tflag].count)
1488 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1489 if (parms[aflag].count) {
1490 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1492 fprintf(stderr, "Illegal %s\n", "source address");
1495 saddr = *((struct sockaddr_in *) res->ai_addr);
1499 if (parms[uflag].count)
1500 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1501 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1506 /* Process collectors parameters. Brrrr... :-[ */
1508 npeers = argc - optind;
1510 /* Send to remote Netflow collector */
1511 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1512 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1514 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1516 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1517 fprintf(stderr, "socket(): %s\n", strerror(errno));
1520 peers[npeers].write_fd = write_fd;
1521 peers[npeers].type = PEER_MIRROR;
1522 peers[npeers].laddr = saddr;
1523 peers[npeers].seq = 0;
1524 if ((lhost = strchr(dport, '/'))) {
1526 if ((type = strchr(lhost, '/'))) {
1534 peers[npeers].type = PEER_ROTATE;
1543 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1544 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1548 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1549 sizeof(struct sockaddr_in))) {
1550 fprintf(stderr, "bind(): %s\n", strerror(errno));
1553 if (getaddrinfo(dhost, dport, &hints, &res)) {
1555 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1558 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1560 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1561 sizeof(struct sockaddr_in))) {
1562 fprintf(stderr, "connect(): %s\n", strerror(errno));
1566 /* Restore command line */
1567 if (type) *--type = '/';
1568 if (lhost) *--lhost = '/';
1572 else if (parms[fflag].count) {
1574 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1575 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1576 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1578 peers[npeers].write_fd = START_DATA_FD;
1579 peers[npeers].type = PEER_FILE;
1580 peers[npeers].seq = 0;
1589 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1590 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1592 fprintf(stderr, "libipulog initialization error: %s",
1593 ipulog_strerror(ipulog_errno));
1597 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1598 &sockbufsize, sizeof(sockbufsize)) < 0)
1599 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1601 /* Daemonize (if log destination stdout-free) */
1603 my_log_open(ident, verbosity, log_dest);
1607 if (!(log_dest & 2)) {
1608 /* Crash-proofing - Sapan*/
1612 fprintf(stderr, "fork(): %s", strerror(errno));
1617 freopen("/dev/null", "r", stdin);
1618 freopen("/dev/null", "w", stdout);
1619 freopen("/dev/null", "w", stderr);
1623 while (wait3(NULL,0,NULL) < 1);
1627 setvbuf(stdout, (char *)0, _IONBF, 0);
1628 setvbuf(stderr, (char *)0, _IONBF, 0);
1632 sprintf(errpbuf, "[%ld]", (long) pid);
1633 strcat(ident, errpbuf);
1635 /* Initialization */
1637 init_slice_id_hash();
1638 hash_init(); /* Actually for crc16 only */
1639 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1640 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1643 /* Hope 12 days is enough :-/ */
1644 start_time_offset = 1 << 20;
1646 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1648 gettime(&start_time);
1651 Build static pending queue as circular buffer.
1653 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1654 pending_tail = pending_head;
1655 for (i = pending_queue_length - 1; i--;) {
1656 if (!(pending_tail->next = mem_alloc())) {
1658 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1661 pending_tail = pending_tail->next;
1663 pending_tail->next = pending_head;
1664 pending_tail = pending_head;
1666 sigemptyset(&sig_mask);
1667 sigact.sa_handler = &sighandler;
1668 sigact.sa_mask = sig_mask;
1669 sigact.sa_flags = 0;
1670 sigaddset(&sig_mask, SIGTERM);
1671 sigaction(SIGTERM, &sigact, 0);
1672 #if ((DEBUG) & DEBUG_I)
1673 sigaddset(&sig_mask, SIGUSR1);
1674 sigaction(SIGUSR1, &sigact, 0);
1676 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1677 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1681 my_log(LOG_INFO, "Starting %s...", VERSION);
1683 if (parms[cflag].count) {
1684 if (chdir(parms[cflag].arg) || chroot(".")) {
1685 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1690 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1691 pthread_attr_init(&tattr);
1692 for (i = 0; i < THREADS - 1; i++) {
1693 if (schedp.sched_priority > 0) {
1694 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1695 (pthread_attr_setschedparam(&tattr, &schedp))) {
1696 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1700 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1701 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1704 pthread_detach(thid);
1705 schedp.sched_priority++;
1709 if (setgroups(0, NULL)) {
1710 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1713 if (setregid(pw->pw_gid, pw->pw_gid)) {
1714 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1717 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1718 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1723 if (!(pidfile = fopen(pidfilepath, "w")))
1724 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1726 fprintf(pidfile, "%ld\n", (long) pid);
1730 my_log(LOG_INFO, "pid: %d", pid);
1731 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1732 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1733 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1734 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1735 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1736 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1737 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1738 for (i = 0; i < nsnmp_rules; i++) {
1739 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1740 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1742 for (i = 0; i < npeers; i++) {
1743 switch (peers[i].type) {
1751 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1752 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1753 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1756 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1758 timeout.tv_usec = 0;
1760 || (total_elements - free_elements - pending_queue_length)
1762 || pending_tail->flags) {
1765 timeout.tv_sec = scan_interval;
1766 select(0, 0, 0, 0, &timeout);
1769 if (sigs & SIGTERM_MASK && !killed) {
1770 sigs &= ~SIGTERM_MASK;
1771 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1774 active_lifetime = -1;
1775 inactive_lifetime = -1;
1777 unpending_timeout = 1;
1779 pthread_cond_signal(&scan_cond);
1780 pthread_cond_signal(&unpending_cond);
1783 #if ((DEBUG) & DEBUG_I)
1784 if (sigs & SIGUSR1_MASK) {
1785 sigs &= ~SIGUSR1_MASK;
1790 remove(pidfilepath);
1791 #if ((DEBUG) & DEBUG_I)
1794 my_log(LOG_INFO, "Done.");