2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
42 struct ipulog_handle {
45 struct sockaddr_nl local;
46 struct sockaddr_nl peer;
47 struct nlmsghdr* last_nlhdr;
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
62 #include <sys/param.h>
87 #include <sys/select.h>
93 #include <fprobe-ulog.h>
95 #include <my_getopt.h>
100 #define PIDFILE "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE sizeof("32767")
103 #define STD_NETFLOW_PDU
133 static struct getopt_parms parms[] = {
134 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
162 extern int optind, opterr, optopt;
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
181 #define BULK_QUANTITY 200
184 static unsigned epoch_length=60, log_epochs=1;
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
220 static char cur_output_file[MAX_PATH_LEN];
222 static struct Flow *flows[1 << HASH_BITS];
223 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
225 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
228 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *pending_head, *pending_tail;
231 static struct Flow *scan_frag_dreg;
233 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
235 static struct Flow *flows_emit;
237 static char ident[256] = "fprobe-ulog";
238 static FILE *pidfile;
239 static char *pidfilepath;
242 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
243 static struct ipulog_handle *ulog_handle;
244 static uint32_t ulog_gmask = 1;
245 static char *cap_buf;
246 static int nsnmp_rules;
247 static struct snmp_rule *snmp_rules;
248 static struct passwd *pw = 0;
253 "fprobe-ulog: a NetFlow probe. Version %s\n"
254 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
256 "-h\t\tDisplay this help\n"
257 "-U <mask>\tULOG group bitwise mask [1]\n"
258 "-s <seconds>\tHow often scan for expired flows [5]\n"
259 "-g <seconds>\tFragmented flow lifetime [30]\n"
260 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
261 "-f <filename>\tLog flow data in a file\n"
262 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
263 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
264 "-a <address>\tUse <address> as source for NetFlow flow\n"
265 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
266 "-M\t\tUse netfilter mark value as ToS flag\n"
267 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
268 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
269 "-q <flows>\tPending queue length [100]\n"
270 "-B <kilobytes>\tKernel capture buffer size [0]\n"
271 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
272 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
273 "-c <directory>\tDirectory to chroot to\n"
274 "-u <user>\tUser to run as\n"
275 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
276 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
277 "-y <remote:port>\tAddress of the NetFlow collector\n"
278 "-f <writable file>\tFile to write data into\n"
279 "-T <n>\tRotate log file every n epochs\n"
280 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
281 "-E <[1..60]>\tSize of an epoch in minutes\n"
282 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
284 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
288 #if ((DEBUG) & DEBUG_I)
291 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
292 pkts_total, pkts_total_fragmented, size_total,
293 pkts_pending - pkts_pending_done, pending_queue_trace);
294 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
295 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
296 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
297 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
298 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
299 total_elements, free_elements, total_memory);
303 void sighandler(int sig)
307 sigs |= SIGTERM_MASK;
309 #if ((DEBUG) & DEBUG_I)
311 sigs |= SIGUSR1_MASK;
317 void gettime(struct Time *now)
323 now->usec = t.tv_usec;
327 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
329 return (t1->sec - t2->sec)/60;
332 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
334 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
337 /* Uptime in miliseconds */
338 uint32_t getuptime(struct Time *t)
340 /* Maximum uptime is about 49/2 days */
341 return cmpmtime(t, &start_time);
344 /* Uptime in minutes */
345 uint32_t getuptime_minutes(struct Time *t)
347 /* Maximum uptime is about 49/2 days */
348 return cmpMtime(t, &start_time);
351 hash_t hash_flow(struct Flow *flow)
353 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
354 else return hash(flow, sizeof(struct Flow_TL));
357 uint16_t snmp_index(char *name) {
360 if (!*name) return 0;
362 for (i = 0; (int) i < nsnmp_rules; i++) {
363 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
364 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
367 if ((i = if_nametoindex(name))) return i;
372 inline void copy_flow(struct Flow *src, struct Flow *dst)
379 dst->slice_id = src->slice_id;
380 dst->proto = src->proto;
381 dst->tcp_flags = src->tcp_flags;
385 dst->pkts = src->pkts;
386 dst->size = src->size;
387 dst->sizeF = src->sizeF;
388 dst->sizeP = src->sizeP;
389 dst->ctime = src->ctime;
390 dst->mtime = src->mtime;
391 dst->flags = src->flags;
394 void read_cur_epoch() {
396 /* Reset to -1 in case the read fails */
398 fd = open(LAST_EPOCH_FILE, O_RDONLY);
400 char snum[MAX_EPOCH_SIZE];
402 len = read(fd, snum, MAX_EPOCH_SIZE-1);
405 sscanf(snum,"%d",&cur_epoch);
406 cur_epoch++; /* Let's not stone the last epoch */
414 /* Dumps the current epoch in a file to cope with
415 * reboots and killings of fprobe */
417 void update_cur_epoch_file(int n) {
419 char snum[MAX_EPOCH_SIZE];
420 len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
421 fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
423 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
426 write(fd, snum, len);
430 /* Get the file descriptor corresponding to the current file.
431 * The kludgy implementation is to abstract away the 'current
432 * file descriptor', which may also be a socket.
435 unsigned get_data_file_fd(char *fname, int cur_fd) {
439 struct statfs statfs;
442 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
443 * doesn't solve the problem */
445 cur_uptime = getuptime_minutes(&now);
447 if (cur_fd != START_DATA_FD) {
448 if (fstatfs(cur_fd, &statfs) == -1) {
449 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
452 if (min_free && (statfs.f_bavail < min_free)
453 && (cur_epoch==last_peak))
455 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
460 assume that we can reclaim space by overwriting our own files
461 and that the difference in size will not fill the disk - sapan
466 /* If epoch length has been exceeded,
467 * or we're starting up
468 * or we're going back to the first epoch */
469 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
471 prev_uptime = cur_uptime;
472 cur_epoch = (cur_epoch + 1) % log_epochs;
473 if (cur_epoch>last_peak) last_peak = cur_epoch;
476 /* Compress the finished file */
477 char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")];
478 snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
481 snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
482 if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)) < 0) {
483 my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
486 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
487 my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
489 update_cur_epoch_file(cur_epoch);
498 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
500 struct Flow **flowpp;
506 if (prev) flowpp = *prev;
509 if (where->sip.s_addr == what->sip.s_addr
510 && where->dip.s_addr == what->dip.s_addr
511 && where->proto == what->proto) {
512 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
514 /* Both unfragmented */
515 if ((what->sp == where->sp)
516 && (what->dp == where->dp)) goto done;
519 /* Both fragmented */
520 if (where->id == what->id) goto done;
524 flowpp = &where->next;
528 if (prev) *prev = flowpp;
532 int put_into(struct Flow *flow, int flag
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
540 struct Flow *flown, **flowpp;
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
546 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
547 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
550 pthread_mutex_lock(&flows_mutex[h]);
552 if (!(flown = find(flows[h], flow, &flowpp))) {
553 /* No suitable flow found - add */
554 if (flag == COPY_INTO) {
555 if ((flown = mem_alloc())) {
556 copy_flow(flow, flown);
559 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
560 my_log(LOG_ERR, "%s %s. %s",
561 "mem_alloc():", strerror(errno), "packet lost");
566 flow->next = flows[h];
568 #if ((DEBUG) & DEBUG_I)
570 if (flow->flags & FLOW_FRAG) flows_fragmented++;
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
574 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
579 /* Found suitable flow - update */
580 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
581 sprintf(buf, " +> %x", (unsigned) flown);
584 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
585 flown->mtime = flow->mtime;
586 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
587 flown->ctime = flow->ctime;
588 flown->tcp_flags |= flow->tcp_flags;
589 flown->size += flow->size;
590 flown->pkts += flow->pkts;
592 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
593 * if a better value comes along. A good example of this is that by the time CoDemux sets the
594 * peercred of a flow, it has already been accounted for here and attributed to root. */
596 if (flown->slice_id<1)
597 flown->slice_id = flow->slice_id;
600 if (flow->flags & FLOW_FRAG) {
601 /* Fragmented flow require some additional work */
602 if (flow->flags & FLOW_TL) {
605 Several packets with FLOW_TL (attack)
607 flown->sp = flow->sp;
608 flown->dp = flow->dp;
610 if (flow->flags & FLOW_LASTFRAG) {
613 Several packets with FLOW_LASTFRAG (attack)
615 flown->sizeP = flow->sizeP;
617 flown->flags |= flow->flags;
618 flown->sizeF += flow->sizeF;
619 if ((flown->flags & FLOW_LASTFRAG)
620 && (flown->sizeF >= flown->sizeP)) {
621 /* All fragments received - flow reassembled */
622 *flowpp = flown->next;
623 pthread_mutex_unlock(&flows_mutex[h]);
624 #if ((DEBUG) & DEBUG_I)
629 flown->flags &= ~FLOW_FRAG;
630 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
633 ret = put_into(flown, MOVE_INTO
634 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
640 if (flag == MOVE_INTO) mem_free(flow);
642 pthread_mutex_unlock(&flows_mutex[h]);
648 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
652 for (i = 0; i < fields; i++) {
653 #if ((DEBUG) & DEBUG_F)
654 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
657 case NETFLOW_IPV4_SRC_ADDR:
658 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
659 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
662 case NETFLOW_IPV4_DST_ADDR:
663 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
664 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
665 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
667 p += NETFLOW_IPV4_DST_ADDR_SIZE;
670 case NETFLOW_INPUT_SNMP:
671 *((uint16_t *) p) = htons(flow->iif);
672 p += NETFLOW_INPUT_SNMP_SIZE;
675 case NETFLOW_OUTPUT_SNMP:
676 *((uint16_t *) p) = htons(flow->oif);
677 p += NETFLOW_OUTPUT_SNMP_SIZE;
680 case NETFLOW_PKTS_32:
681 *((uint32_t *) p) = htonl(flow->pkts);
682 p += NETFLOW_PKTS_32_SIZE;
685 case NETFLOW_BYTES_32:
686 *((uint32_t *) p) = htonl(flow->size);
687 p += NETFLOW_BYTES_32_SIZE;
690 case NETFLOW_FIRST_SWITCHED:
691 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
692 p += NETFLOW_FIRST_SWITCHED_SIZE;
695 case NETFLOW_LAST_SWITCHED:
696 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
697 p += NETFLOW_LAST_SWITCHED_SIZE;
700 case NETFLOW_L4_SRC_PORT:
701 *((uint16_t *) p) = flow->sp;
702 p += NETFLOW_L4_SRC_PORT_SIZE;
705 case NETFLOW_L4_DST_PORT:
706 *((uint16_t *) p) = flow->dp;
707 p += NETFLOW_L4_DST_PORT_SIZE;
711 *((uint8_t *) p) = flow->proto;
712 p += NETFLOW_PROT_SIZE;
715 case NETFLOW_SRC_TOS:
716 *((uint8_t *) p) = flow->tos;
717 p += NETFLOW_SRC_TOS_SIZE;
720 case NETFLOW_TCP_FLAGS:
721 *((uint8_t *) p) = flow->tcp_flags;
722 p += NETFLOW_TCP_FLAGS_SIZE;
725 case NETFLOW_VERSION:
726 *((uint16_t *) p) = htons(netflow->Version);
727 p += NETFLOW_VERSION_SIZE;
731 *((uint16_t *) p) = htons(emit_count);
732 p += NETFLOW_COUNT_SIZE;
736 *((uint32_t *) p) = htonl(getuptime(&emit_time));
737 p += NETFLOW_UPTIME_SIZE;
740 case NETFLOW_UNIX_SECS:
741 *((uint32_t *) p) = htonl(emit_time.sec);
742 p += NETFLOW_UNIX_SECS_SIZE;
745 case NETFLOW_UNIX_NSECS:
746 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
747 p += NETFLOW_UNIX_NSECS_SIZE;
750 case NETFLOW_FLOW_SEQUENCE:
751 //*((uint32_t *) p) = htonl(emit_sequence);
752 *((uint32_t *) p) = 0;
753 p += NETFLOW_FLOW_SEQUENCE_SIZE;
757 /* Unsupported (uint8_t) */
758 case NETFLOW_ENGINE_TYPE:
759 case NETFLOW_ENGINE_ID:
760 case NETFLOW_FLAGS7_1:
761 case NETFLOW_SRC_MASK:
762 case NETFLOW_DST_MASK:
764 my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
767 *((uint8_t *) p) = 0;
768 p += NETFLOW_PAD8_SIZE;
770 case NETFLOW_SLICE_ID:
771 *((uint32_t *) p) = flow->slice_id;
772 p += NETFLOW_SLICE_ID_SIZE;
775 /* Unsupported (uint16_t) */
778 case NETFLOW_FLAGS7_2:
779 *((uint16_t *) p) = 0;
780 p += NETFLOW_PAD16_SIZE;
784 /* Unsupported (uint32_t) */
785 case NETFLOW_IPV4_NEXT_HOP:
786 case NETFLOW_ROUTER_SC:
787 *((uint32_t *) p) = 0;
788 p += NETFLOW_PAD32_SIZE;
792 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
793 format, i, format[i]);
797 #if ((DEBUG) & DEBUG_F)
798 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
805 Workaround for clone()-based threads
806 Try to change EUID independently of main thread
810 setregid(pw->pw_gid, pw->pw_gid);
811 setreuid(pw->pw_uid, pw->pw_uid);
820 struct timespec timeout;
821 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
823 p = (void *) &emit_packet + netflow->HeaderSize;
829 pthread_mutex_lock(&emit_mutex);
830 while (!flows_emit) {
831 gettimeofday(&now, 0);
832 timeout.tv_sec = now.tv_sec + emit_timeout;
833 /* Do not wait until emit_packet will filled - it may be too long */
834 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
835 //my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
836 pthread_mutex_unlock(&emit_mutex);
841 flows_emit = flows_emit->next;
842 #if ((DEBUG) & DEBUG_I)
845 pthread_mutex_unlock(&emit_mutex);
849 gettime(&start_time);
850 start_time.sec -= start_time_offset;
853 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
857 printf("Emit count = %d\n", emit_count);
860 if (emit_count == netflow->MaxFlows) {
863 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
864 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
865 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
866 #ifdef STD_NETFLOW_PDU
867 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
870 for (i = 0; i < npeers; i++) {
871 if (peers[i].type == PEER_FILE) {
872 if (netflow->SeqOffset)
873 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
874 peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
875 ret = write(peers[i].write_fd, emit_packet, size);
878 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
879 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
880 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
884 #if ((DEBUG) & DEBUG_E)
886 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
887 emit_count, i + 1, peers[i].seq);
890 peers[i].seq += emit_count;
893 if (emit_rate_bytes) {
895 delay = sent / emit_rate_bytes;
897 sent %= emit_rate_bytes;
899 timeout.tv_nsec = emit_rate_delay * delay;
900 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
905 if (peers[i].type == PEER_MIRROR) goto sendreal;
907 if (peers[i].type == PEER_ROTATE)
908 if (peer_rot_cur++ == peer_rot_work) {
910 if (netflow->SeqOffset)
911 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
912 ret = send(peers[i].write_fd, emit_packet, size, 0);
914 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
915 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
916 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
919 #if ((DEBUG) & DEBUG_E)
921 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
922 emit_count, i + 1, peers[i].seq);
925 peers[i].seq += emit_count;
928 if (emit_rate_bytes) {
930 delay = sent / emit_rate_bytes;
932 sent %= emit_rate_bytes;
934 timeout.tv_nsec = emit_rate_delay * delay;
935 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
940 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
941 emit_sequence += emit_count;
943 #if ((DEBUG) & DEBUG_I)
950 void *unpending_thread()
953 struct timespec timeout;
954 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
961 pthread_mutex_lock(&unpending_mutex);
964 while (!(pending_tail->flags & FLOW_PENDING)) {
965 gettimeofday(&now, 0);
966 timeout.tv_sec = now.tv_sec + unpending_timeout;
967 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
970 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
973 if (put_into(pending_tail, COPY_INTO
974 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
978 #if ((DEBUG) & DEBUG_I)
979 pkts_lost_unpending++;
983 #if ((DEBUG) & DEBUG_U)
984 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
987 pending_tail->flags = 0;
988 pending_tail = pending_tail->next;
989 #if ((DEBUG) & DEBUG_I)
997 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1001 struct Flow *flow, **flowpp;
1003 struct timespec timeout;
1007 timeout.tv_nsec = 0;
1008 pthread_mutex_lock(&scan_mutex);
1012 timeout.tv_sec = now.sec + scan_interval;
1013 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1016 #if ((DEBUG) & DEBUG_S)
1017 my_log(LOG_DEBUG, "S: %d", now.sec);
1019 for (i = 0; i < 1 << HASH_BITS ; i++) {
1020 pthread_mutex_lock(&flows_mutex[i]);
1024 if (flow->flags & FLOW_FRAG) {
1025 /* Process fragmented flow */
1026 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1027 /* Fragmented flow expired - put it into special chain */
1028 #if ((DEBUG) & DEBUG_I)
1032 *flowpp = flow->next;
1034 flow->flags &= ~FLOW_FRAG;
1035 flow->next = scan_frag_dreg;
1036 scan_frag_dreg = flow;
1041 /* Flow is not frgamented */
1042 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1043 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1045 #if ((DEBUG) & DEBUG_S)
1046 my_log(LOG_DEBUG, "S: E %x", flow);
1048 #if ((DEBUG) & DEBUG_I)
1051 *flowpp = flow->next;
1052 pthread_mutex_lock(&emit_mutex);
1053 flow->next = flows_emit;
1055 #if ((DEBUG) & DEBUG_I)
1058 pthread_mutex_unlock(&emit_mutex);
1063 flowpp = &flow->next;
1066 pthread_mutex_unlock(&flows_mutex[i]);
1068 if (flows_emit) pthread_cond_signal(&emit_cond);
1070 while (scan_frag_dreg) {
1071 flow = scan_frag_dreg;
1072 scan_frag_dreg = flow->next;
1073 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1076 put_into(flow, MOVE_INTO
1077 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1081 #if ((DEBUG) & DEBUG_S)
1082 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1090 struct ulog_packet_msg *ulog_msg;
1094 int len, off_frag, psize;
1095 #if ((DEBUG) & DEBUG_C)
1104 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1106 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1109 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1111 #if ((DEBUG) & DEBUG_C)
1112 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1115 nl = (void *) &ulog_msg->payload;
1116 psize = ulog_msg->data_len;
1119 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1120 #if ((DEBUG) & DEBUG_C)
1121 strcat(logbuf, " U");
1122 my_log(LOG_DEBUG, "%s", logbuf);
1124 #if ((DEBUG) & DEBUG_I)
1130 if (pending_head->flags) {
1131 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1133 # if ((DEBUG) & DEBUG_C)
1138 "pending queue full:", "packet lost");
1140 #if ((DEBUG) & DEBUG_I)
1141 pkts_lost_capture++;
1146 #if ((DEBUG) & DEBUG_I)
1150 flow = pending_head;
1152 /* ?FIXME? Add sanity check for ip_len? */
1153 flow->size = ntohs(nl->ip_len);
1154 #if ((DEBUG) & DEBUG_I)
1155 size_total += flow->size;
1158 flow->sip = nl->ip_src;
1159 flow->dip = nl->ip_dst;
1160 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1162 /* It's going to be expensive calling this syscall on every flow.
1163 * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1167 if (ulog_msg->mark > 0) {
1168 flow->slice_id = xid_to_slice_id(ulog_msg->mark);
1171 if (flow->slice_id < 1)
1172 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
1175 if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1176 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1178 flow->iif = snmp_index(ulog_msg->indev_name);
1179 flow->oif = snmp_index(ulog_msg->outdev_name);
1180 flow->proto = nl->ip_p;
1182 flow->tcp_flags = 0;
1186 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1187 if (ulog_msg->timestamp_sec) {
1188 flow->ctime.sec = ulog_msg->timestamp_sec;
1189 flow->ctime.usec = ulog_msg->timestamp_usec;
1190 } else gettime(&flow->ctime);
1191 flow->mtime = flow->ctime;
1193 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1196 Offset (from network layer) to transport layer header/IP data
1197 IOW IP header size ;-)
1200 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1202 off_tl = nl->ip_hl << 2;
1203 tl = (void *) nl + off_tl;
1205 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1206 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1208 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1209 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1211 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1212 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1213 #if ((DEBUG) & DEBUG_C)
1214 strcat(logbuf, " F");
1216 #if ((DEBUG) & DEBUG_I)
1217 pkts_total_fragmented++;
1219 flow->flags |= FLOW_FRAG;
1220 flow->id = nl->ip_id;
1222 if (!(ntohs(nl->ip_off) & IP_MF)) {
1223 /* Packet whith IP_MF contains information about whole datagram size */
1224 flow->flags |= FLOW_LASTFRAG;
1225 /* size = frag_offset*8 + data_size */
1226 flow->sizeP = off_frag + flow->sizeF;
1230 #if ((DEBUG) & DEBUG_C)
1231 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1232 strcat(logbuf, buf);
1233 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1234 strcat(logbuf, buf);
1238 Fortunately most interesting transport layer information fit
1239 into first 8 bytes of IP data field (minimal nonzero size).
1240 Thus we don't need actual packet reassembling to build whole
1241 transport layer data. We only check the fragment offset for
1242 zero value to find packet with this information.
1244 if (!off_frag && psize >= 8) {
1245 switch (flow->proto) {
1248 flow->sp = ((struct udphdr *)tl)->uh_sport;
1249 flow->dp = ((struct udphdr *)tl)->uh_dport;
1254 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1255 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1258 #ifdef ICMP_TRICK_CISCO
1260 flow->dp = *((int32_t *) tl);
1265 /* Unknown transport layer */
1266 #if ((DEBUG) & DEBUG_C)
1267 strcat(logbuf, " U");
1274 #if ((DEBUG) & DEBUG_C)
1275 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1276 strcat(logbuf, buf);
1278 flow->flags |= FLOW_TL;
1282 /* Check for tcp flags presence (including CWR and ECE). */
1283 if (flow->proto == IPPROTO_TCP
1285 && psize >= 16 - off_frag) {
1286 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1287 #if ((DEBUG) & DEBUG_C)
1288 sprintf(buf, " TCP:%x", flow->tcp_flags);
1289 strcat(logbuf, buf);
1293 #if ((DEBUG) & DEBUG_C)
1294 sprintf(buf, " => %x", (unsigned) flow);
1295 strcat(logbuf, buf);
1296 my_log(LOG_DEBUG, "%s", logbuf);
1299 #if ((DEBUG) & DEBUG_I)
1301 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1302 if (pending_queue_trace < pending_queue_trace_candidate)
1303 pending_queue_trace = pending_queue_trace_candidate;
1306 /* Flow complete - inform unpending_thread() about it */
1307 pending_head->flags |= FLOW_PENDING;
1308 pending_head = pending_head->next;
1310 pthread_cond_signal(&unpending_cond);
1316 /* Copied out of CoDemux */
1318 static int init_daemon() {
1322 pidfile = fopen(PIDFILE, "w");
1323 if (pidfile == NULL) {
1324 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1327 if ((pid = fork()) < 0) {
1329 my_log(LOG_ERR, "Could not fork!\n");
1332 else if (pid != 0) {
1333 /* i'm the parent, writing down the child pid */
1334 fprintf(pidfile, "%u\n", pid);
1339 /* close the pid file */
1342 /* routines for any daemon process
1343 1. create a new session
1344 2. change directory to the root
1345 3. change the file creation permission
1348 chdir("/var/local/fprobe");
1354 int main(int argc, char **argv)
1357 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1358 int c, i, write_fd, memory_limit = 0;
1359 struct addrinfo hints, *res;
1360 struct sockaddr_in saddr;
1361 pthread_attr_t tattr;
1362 struct sigaction sigact;
1363 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1364 struct timeval timeout;
1366 sched_min = sched_get_priority_min(SCHED);
1367 sched_max = sched_get_priority_max(SCHED);
1369 memset(&saddr, 0 , sizeof(saddr));
1370 memset(&hints, 0 , sizeof(hints));
1371 hints.ai_flags = AI_PASSIVE;
1372 hints.ai_family = AF_INET;
1373 hints.ai_socktype = SOCK_DGRAM;
1375 /* Process command line options */
1378 while ((c = my_getopt(argc, argv, parms)) != -1) {
1388 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1389 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1390 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1391 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1392 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1393 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1394 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1395 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1396 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1397 if (parms[nflag].count) {
1398 switch (atoi(parms[nflag].arg)) {
1400 netflow = &NetFlow1;
1407 netflow = &NetFlow7;
1411 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1415 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1416 if (parms[lflag].count) {
1417 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1420 sprintf(errpbuf, "[%s]", log_suffix);
1421 strcat(ident, errpbuf);
1424 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1425 if (log_suffix) *--log_suffix = ':';
1427 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1429 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1432 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1433 if (parms[qflag].count) {
1434 pending_queue_length = atoi(parms[qflag].arg);
1435 if (pending_queue_length < 1) {
1436 fprintf(stderr, "Illegal %s\n", "pending queue length");
1440 if (parms[rflag].count) {
1441 schedp.sched_priority = atoi(parms[rflag].arg);
1442 if (schedp.sched_priority
1443 && (schedp.sched_priority < sched_min
1444 || schedp.sched_priority > sched_max)) {
1445 fprintf(stderr, "Illegal %s\n", "realtime priority");
1449 if (parms[Bflag].count) {
1450 sockbufsize = atoi(parms[Bflag].arg) << 10;
1452 if (parms[bflag].count) {
1453 bulk_quantity = atoi(parms[bflag].arg);
1454 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1455 fprintf(stderr, "Illegal %s\n", "bulk size");
1459 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1460 if (parms[Xflag].count) {
1461 for(i = 0; parms[Xflag].arg[i]; i++)
1462 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1463 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1465 rule = strtok(parms[Xflag].arg, ":");
1466 for (i = 0; rule; i++) {
1467 snmp_rules[i].len = strlen(rule);
1468 if (snmp_rules[i].len > IFNAMSIZ) {
1469 fprintf(stderr, "Illegal %s\n", "interface basename");
1472 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1473 if (!*(rule - 1)) *(rule - 1) = ',';
1474 rule = strtok(NULL, ",");
1476 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1479 snmp_rules[i].base = atoi(rule);
1481 rule = strtok(NULL, ":");
1485 if (parms[tflag].count)
1486 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1487 if (parms[aflag].count) {
1488 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1490 fprintf(stderr, "Illegal %s\n", "source address");
1493 saddr = *((struct sockaddr_in *) res->ai_addr);
1497 if (parms[uflag].count)
1498 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1499 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1504 /* Process collectors parameters. Brrrr... :-[ */
1506 npeers = argc - optind;
1508 /* Send to remote Netflow collector */
1509 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1510 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1512 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1514 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1515 fprintf(stderr, "socket(): %s\n", strerror(errno));
1518 peers[npeers].write_fd = write_fd;
1519 peers[npeers].type = PEER_MIRROR;
1520 peers[npeers].laddr = saddr;
1521 peers[npeers].seq = 0;
1522 if ((lhost = strchr(dport, '/'))) {
1524 if ((type = strchr(lhost, '/'))) {
1532 peers[npeers].type = PEER_ROTATE;
1541 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1542 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1546 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1547 sizeof(struct sockaddr_in))) {
1548 fprintf(stderr, "bind(): %s\n", strerror(errno));
1551 if (getaddrinfo(dhost, dport, &hints, &res)) {
1553 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1556 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1558 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1559 sizeof(struct sockaddr_in))) {
1560 fprintf(stderr, "connect(): %s\n", strerror(errno));
1564 /* Restore command line */
1565 if (type) *--type = '/';
1566 if (lhost) *--lhost = '/';
1570 else if (parms[fflag].count) {
1572 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1573 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1574 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1576 peers[npeers].write_fd = START_DATA_FD;
1577 peers[npeers].type = PEER_FILE;
1578 peers[npeers].seq = 0;
1587 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1588 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1590 fprintf(stderr, "libipulog initialization error: %s",
1591 ipulog_strerror(ipulog_errno));
1595 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1596 &sockbufsize, sizeof(sockbufsize)) < 0)
1597 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1599 /* Daemonize (if log destination stdout-free) */
1601 my_log_open(ident, verbosity, log_dest);
1605 if (!(log_dest & 2)) {
1606 /* Crash-proofing - Sapan*/
1610 fprintf(stderr, "fork(): %s", strerror(errno));
1615 freopen("/dev/null", "r", stdin);
1616 freopen("/dev/null", "w", stdout);
1617 freopen("/dev/null", "w", stderr);
1621 while (wait3(NULL,0,NULL) < 1);
1625 setvbuf(stdout, (char *)0, _IONBF, 0);
1626 setvbuf(stderr, (char *)0, _IONBF, 0);
1630 sprintf(errpbuf, "[%ld]", (long) pid);
1631 strcat(ident, errpbuf);
1633 /* Initialization */
1635 init_slice_id_hash();
1636 hash_init(); /* Actually for crc16 only */
1637 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1638 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1641 /* Hope 12 days is enough :-/ */
1642 start_time_offset = 1 << 20;
1644 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1646 gettime(&start_time);
1649 Build static pending queue as circular buffer.
1651 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1652 pending_tail = pending_head;
1653 for (i = pending_queue_length - 1; i--;) {
1654 if (!(pending_tail->next = mem_alloc())) {
1656 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1659 pending_tail = pending_tail->next;
1661 pending_tail->next = pending_head;
1662 pending_tail = pending_head;
1664 sigemptyset(&sig_mask);
1665 sigact.sa_handler = &sighandler;
1666 sigact.sa_mask = sig_mask;
1667 sigact.sa_flags = 0;
1668 sigaddset(&sig_mask, SIGTERM);
1669 sigaction(SIGTERM, &sigact, 0);
1670 #if ((DEBUG) & DEBUG_I)
1671 sigaddset(&sig_mask, SIGUSR1);
1672 sigaction(SIGUSR1, &sigact, 0);
1674 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1675 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1679 my_log(LOG_INFO, "Starting %s...", VERSION);
1681 if (parms[cflag].count) {
1682 if (chdir(parms[cflag].arg) || chroot(".")) {
1683 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1688 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1689 pthread_attr_init(&tattr);
1690 for (i = 0; i < THREADS - 1; i++) {
1691 if (schedp.sched_priority > 0) {
1692 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1693 (pthread_attr_setschedparam(&tattr, &schedp))) {
1694 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1698 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1699 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1702 pthread_detach(thid);
1703 schedp.sched_priority++;
1707 if (setgroups(0, NULL)) {
1708 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1711 if (setregid(pw->pw_gid, pw->pw_gid)) {
1712 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1715 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1716 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1721 if (!(pidfile = fopen(pidfilepath, "w")))
1722 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1724 fprintf(pidfile, "%ld\n", (long) pid);
1728 my_log(LOG_INFO, "pid: %d", pid);
1729 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1730 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1731 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1732 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1733 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1734 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1735 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1736 for (i = 0; i < nsnmp_rules; i++) {
1737 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1738 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1740 for (i = 0; i < npeers; i++) {
1741 switch (peers[i].type) {
1749 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1750 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1751 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1754 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1756 timeout.tv_usec = 0;
1758 || (total_elements - free_elements - pending_queue_length)
1760 || pending_tail->flags) {
1763 timeout.tv_sec = scan_interval;
1764 select(0, 0, 0, 0, &timeout);
1767 if (sigs & SIGTERM_MASK && !killed) {
1768 sigs &= ~SIGTERM_MASK;
1769 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1772 active_lifetime = -1;
1773 inactive_lifetime = -1;
1775 unpending_timeout = 1;
1777 pthread_cond_signal(&scan_cond);
1778 pthread_cond_signal(&unpending_cond);
1781 #if ((DEBUG) & DEBUG_I)
1782 if (sigs & SIGUSR1_MASK) {
1783 sigs &= ~SIGUSR1_MASK;
1788 remove(pidfilepath);
1789 #if ((DEBUG) & DEBUG_I)
1792 my_log(LOG_INFO, "Done.");