2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 Sapan Bhatia <sapanb@cs.princeton.edu>
11 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
14 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
20 /* stdout, stderr, freopen() */
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
43 struct sockaddr_nl local;
44 struct sockaddr_nl peer;
45 struct nlmsghdr* last_nlhdr;
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
60 #include <sys/param.h>
85 #include <sys/select.h>
91 #include <fprobe-ulog.h>
93 #include <my_getopt.h>
98 #define PIDFILE "/var/log/fprobe-ulog.pid"
99 #define STD_NETFLOW_PDU
129 static struct getopt_parms parms[] = {
130 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
147 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
148 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
149 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
150 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
153 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158 extern int optind, opterr, optopt;
161 extern struct NetFlow NetFlow1;
162 extern struct NetFlow NetFlow5;
163 extern struct NetFlow NetFlow7;
165 #define START_VALUE -5
166 #define mark_is_tos parms[Mflag].count
167 static unsigned scan_interval = 5;
168 static int min_free = 0;
169 static int frag_lifetime = 30;
170 static int inactive_lifetime = 60;
171 static int active_lifetime = 300;
172 static int sockbufsize;
173 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
174 #if (MEM_BITS == 0) || (MEM_BITS == 16)
175 #define BULK_QUANTITY 10000
177 #define BULK_QUANTITY 200
180 static unsigned epoch_length=60, log_epochs=1;
181 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
183 static unsigned bulk_quantity = BULK_QUANTITY;
184 static unsigned pending_queue_length = 100;
185 static struct NetFlow *netflow = &NetFlow5;
186 static unsigned verbosity = 6;
187 static unsigned log_dest = MY_LOG_SYSLOG;
188 static struct Time start_time;
189 static long start_time_offset;
192 extern unsigned total_elements;
193 extern unsigned free_elements;
194 extern unsigned total_memory;
195 #if ((DEBUG) & DEBUG_I)
196 static unsigned emit_pkts, emit_queue;
197 static uint64_t size_total;
198 static unsigned pkts_total, pkts_total_fragmented;
199 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
200 static unsigned pkts_pending, pkts_pending_done;
201 static unsigned pending_queue_trace, pending_queue_trace_candidate;
202 static unsigned flows_total, flows_fragmented;
204 static unsigned emit_count;
205 static uint32_t emit_sequence;
206 static unsigned emit_rate_bytes, emit_rate_delay;
207 static struct Time emit_time;
208 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
209 static pthread_t thid;
210 static sigset_t sig_mask;
211 static struct sched_param schedp;
212 static int sched_min, sched_max;
213 static int npeers, npeers_rot;
214 static struct peer *peers;
217 static struct Flow *flows[1 << HASH_BITS];
218 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
220 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
221 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
223 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
224 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
225 static struct Flow *pending_head, *pending_tail;
226 static struct Flow *scan_frag_dreg;
228 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *flows_emit;
232 static char ident[256] = "fprobe-ulog";
233 static FILE *pidfile;
234 static char *pidfilepath;
237 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
238 static struct ipulog_handle *ulog_handle;
239 static uint32_t ulog_gmask = 1;
240 static char *cap_buf;
241 static int nsnmp_rules;
242 static struct snmp_rule *snmp_rules;
243 static struct passwd *pw = 0;
248 "fprobe-ulog: a NetFlow probe. Version %s\n"
249 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
251 "-h\t\tDisplay this help\n"
252 "-U <mask>\tULOG group bitwise mask [1]\n"
253 "-s <seconds>\tHow often scan for expired flows [5]\n"
254 "-g <seconds>\tFragmented flow lifetime [30]\n"
255 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
256 "-f <filename>\tLog flow data in a file\n"
257 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
258 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
259 "-a <address>\tUse <address> as source for NetFlow flow\n"
260 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
261 "-M\t\tUse netfilter mark value as ToS flag\n"
262 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
263 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
264 "-q <flows>\tPending queue length [100]\n"
265 "-B <kilobytes>\tKernel capture buffer size [0]\n"
266 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
267 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
268 "-c <directory>\tDirectory to chroot to\n"
269 "-u <user>\tUser to run as\n"
270 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
271 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
272 "-y <remote:port>\tAddress of the NetFlow collector\n"
273 "-f <writable file>\tFile to write data into\n"
274 "-T <n>\tRotate log file every n epochs\n"
275 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
276 "-E <[1..60]>\tSize of an epoch in minutes\n"
277 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
279 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
283 #if ((DEBUG) & DEBUG_I)
286 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
287 pkts_total, pkts_total_fragmented, size_total,
288 pkts_pending - pkts_pending_done, pending_queue_trace);
289 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
290 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
291 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
292 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
293 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
294 total_elements, free_elements, total_memory);
298 void sighandler(int sig)
302 sigs |= SIGTERM_MASK;
304 #if ((DEBUG) & DEBUG_I)
306 sigs |= SIGUSR1_MASK;
312 void gettime(struct Time *now)
318 now->usec = t.tv_usec;
322 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
324 return (t1->sec - t2->sec)/60;
327 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
329 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
332 /* Uptime in miliseconds */
333 uint32_t getuptime(struct Time *t)
335 /* Maximum uptime is about 49/2 days */
336 return cmpmtime(t, &start_time);
339 /* Uptime in minutes */
340 uint32_t getuptime_minutes(struct Time *t)
342 /* Maximum uptime is about 49/2 days */
343 return cmpMtime(t, &start_time);
346 hash_t hash_flow(struct Flow *flow)
348 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
349 else return hash(flow, sizeof(struct Flow_TL));
352 uint16_t snmp_index(char *name) {
355 if (!*name) return 0;
357 for (i = 0; (int) i < nsnmp_rules; i++) {
358 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
359 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
362 if ((i = if_nametoindex(name))) return i;
367 inline void copy_flow(struct Flow *src, struct Flow *dst)
374 dst->proto = src->proto;
375 dst->tcp_flags = src->tcp_flags;
379 dst->pkts = src->pkts;
380 dst->size = src->size;
381 dst->sizeF = src->sizeF;
382 dst->sizeP = src->sizeP;
383 dst->ctime = src->ctime;
384 dst->mtime = src->mtime;
385 dst->flags = src->flags;
388 void get_cur_epoch() {
390 fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
394 len = read(fd, snum, sizeof(snum)-1);
397 sscanf(snum,"%d",&cur_epoch);
398 cur_epoch++; /* Let's not stone the last epoch */
406 void update_cur_epoch_file(int n) {
409 len=snprintf(snum,6,"%d",n);
410 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
412 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
415 write(fd, snum, len);
419 unsigned get_log_fd(char *fname, int cur_fd) {
422 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
423 * doesn't solve the problem */
425 struct statfs statfs;
428 cur_uptime = getuptime_minutes(&now);
431 if (cur_fd!=START_VALUE) {
432 if (fstatfs(cur_fd, &statfs) == -1) {
433 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
436 if (min_free && (statfs.f_bavail < min_free)
437 && (cur_epoch==last_peak))
439 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
444 assume that we can reclaim space by overwriting our own files
445 and that the difference in size will not fill the disk - sapan
450 /* Epoch length in minutes */
451 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
452 char nextname[MAX_PATH_LEN];
454 prev_uptime = cur_uptime;
455 cur_epoch = (cur_epoch + 1) % log_epochs;
456 last_peak = cur_epoch;
459 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
460 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
461 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
464 update_cur_epoch_file(cur_epoch);
473 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
475 struct Flow **flowpp;
481 if (prev) flowpp = *prev;
484 if (where->sip.s_addr == what->sip.s_addr
485 && where->dip.s_addr == what->dip.s_addr
486 && where->proto == what->proto) {
487 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
489 /* Both unfragmented */
490 if ((what->sp == where->sp)
491 && (what->dp == where->dp)) goto done;
494 /* Both fragmented */
495 if (where->id == what->id) goto done;
499 flowpp = &where->next;
503 if (prev) *prev = flowpp;
507 int put_into(struct Flow *flow, int flag
508 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
515 struct Flow *flown, **flowpp;
516 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
521 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
522 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
525 pthread_mutex_lock(&flows_mutex[h]);
527 if (!(flown = find(flows[h], flow, &flowpp))) {
528 /* No suitable flow found - add */
529 if (flag == COPY_INTO) {
530 if ((flown = mem_alloc())) {
531 copy_flow(flow, flown);
534 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
535 my_log(LOG_ERR, "%s %s. %s",
536 "mem_alloc():", strerror(errno), "packet lost");
541 flow->next = flows[h];
543 #if ((DEBUG) & DEBUG_I)
545 if (flow->flags & FLOW_FRAG) flows_fragmented++;
547 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
549 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
554 /* Found suitable flow - update */
555 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
556 sprintf(buf, " +> %x", (unsigned) flown);
559 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
560 flown->mtime = flow->mtime;
561 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
562 flown->ctime = flow->ctime;
563 flown->tcp_flags |= flow->tcp_flags;
564 flown->size += flow->size;
565 flown->pkts += flow->pkts;
566 if (flow->flags & FLOW_FRAG) {
567 /* Fragmented flow require some additional work */
568 if (flow->flags & FLOW_TL) {
571 Several packets with FLOW_TL (attack)
573 flown->sp = flow->sp;
574 flown->dp = flow->dp;
576 if (flow->flags & FLOW_LASTFRAG) {
579 Several packets with FLOW_LASTFRAG (attack)
581 flown->sizeP = flow->sizeP;
583 flown->flags |= flow->flags;
584 flown->sizeF += flow->sizeF;
585 if ((flown->flags & FLOW_LASTFRAG)
586 && (flown->sizeF >= flown->sizeP)) {
587 /* All fragments received - flow reassembled */
588 *flowpp = flown->next;
589 pthread_mutex_unlock(&flows_mutex[h]);
590 #if ((DEBUG) & DEBUG_I)
595 flown->flags &= ~FLOW_FRAG;
596 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
599 ret = put_into(flown, MOVE_INTO
600 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
606 if (flag == MOVE_INTO) mem_free(flow);
608 pthread_mutex_unlock(&flows_mutex[h]);
612 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
616 for (i = 0; i < fields; i++) {
617 #if ((DEBUG) & DEBUG_F)
618 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
621 case NETFLOW_IPV4_SRC_ADDR:
622 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
623 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
626 case NETFLOW_IPV4_DST_ADDR:
627 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
628 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
629 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
631 p += NETFLOW_IPV4_DST_ADDR_SIZE;
634 case NETFLOW_INPUT_SNMP:
635 *((uint16_t *) p) = htons(flow->iif);
636 p += NETFLOW_INPUT_SNMP_SIZE;
639 case NETFLOW_OUTPUT_SNMP:
640 *((uint16_t *) p) = htons(flow->oif);
641 p += NETFLOW_OUTPUT_SNMP_SIZE;
644 case NETFLOW_PKTS_32:
645 *((uint32_t *) p) = htonl(flow->pkts);
646 p += NETFLOW_PKTS_32_SIZE;
649 case NETFLOW_BYTES_32:
650 *((uint32_t *) p) = htonl(flow->size);
651 p += NETFLOW_BYTES_32_SIZE;
654 case NETFLOW_FIRST_SWITCHED:
655 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
656 p += NETFLOW_FIRST_SWITCHED_SIZE;
659 case NETFLOW_LAST_SWITCHED:
660 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
661 p += NETFLOW_LAST_SWITCHED_SIZE;
664 case NETFLOW_L4_SRC_PORT:
665 *((uint16_t *) p) = flow->sp;
666 p += NETFLOW_L4_SRC_PORT_SIZE;
669 case NETFLOW_L4_DST_PORT:
670 *((uint16_t *) p) = flow->dp;
671 p += NETFLOW_L4_DST_PORT_SIZE;
675 *((uint8_t *) p) = flow->proto;
676 p += NETFLOW_PROT_SIZE;
679 case NETFLOW_SRC_TOS:
680 *((uint8_t *) p) = flow->tos;
681 p += NETFLOW_SRC_TOS_SIZE;
684 case NETFLOW_TCP_FLAGS:
685 *((uint8_t *) p) = flow->tcp_flags;
686 p += NETFLOW_TCP_FLAGS_SIZE;
689 case NETFLOW_VERSION:
690 *((uint16_t *) p) = htons(netflow->Version);
691 p += NETFLOW_VERSION_SIZE;
695 *((uint16_t *) p) = htons(emit_count);
696 p += NETFLOW_COUNT_SIZE;
700 *((uint32_t *) p) = htonl(getuptime(&emit_time));
701 p += NETFLOW_UPTIME_SIZE;
704 case NETFLOW_UNIX_SECS:
705 *((uint32_t *) p) = htonl(emit_time.sec);
706 p += NETFLOW_UNIX_SECS_SIZE;
709 case NETFLOW_UNIX_NSECS:
710 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
711 p += NETFLOW_UNIX_NSECS_SIZE;
714 case NETFLOW_FLOW_SEQUENCE:
715 //*((uint32_t *) p) = htonl(emit_sequence);
716 *((uint32_t *) p) = 0;
717 p += NETFLOW_FLOW_SEQUENCE_SIZE;
721 /* Unsupported (uint8_t) */
722 case NETFLOW_ENGINE_TYPE:
723 case NETFLOW_ENGINE_ID:
724 case NETFLOW_FLAGS7_1:
725 case NETFLOW_SRC_MASK:
726 case NETFLOW_DST_MASK:
727 *((uint8_t *) p) = 0;
728 p += NETFLOW_PAD8_SIZE;
731 *((uint16_t *) p) = flow->tos;
732 p += NETFLOW_XID_SIZE;
735 /* Unsupported (uint16_t) */
738 case NETFLOW_FLAGS7_2:
739 *((uint16_t *) p) = 0;
740 p += NETFLOW_PAD16_SIZE;
744 /* Unsupported (uint32_t) */
745 case NETFLOW_IPV4_NEXT_HOP:
746 case NETFLOW_ROUTER_SC:
747 *((uint32_t *) p) = 0;
748 p += NETFLOW_PAD32_SIZE;
752 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
753 format, i, format[i]);
757 #if ((DEBUG) & DEBUG_F)
758 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
765 Workaround for clone()-based threads
766 Try to change EUID independently of main thread
770 setregid(pw->pw_gid, pw->pw_gid);
771 setreuid(pw->pw_uid, pw->pw_uid);
780 struct timespec timeout;
781 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
783 p = (void *) &emit_packet + netflow->HeaderSize;
789 pthread_mutex_lock(&emit_mutex);
790 while (!flows_emit) {
791 gettimeofday(&now, 0);
792 timeout.tv_sec = now.tv_sec + emit_timeout;
793 /* Do not wait until emit_packet will filled - it may be too long */
794 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
795 pthread_mutex_unlock(&emit_mutex);
800 flows_emit = flows_emit->next;
801 #if ((DEBUG) & DEBUG_I)
804 pthread_mutex_unlock(&emit_mutex);
808 gettime(&start_time);
809 start_time.sec -= start_time_offset;
812 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
816 printf("Emit count = %d\n", emit_count);
819 if (emit_count == netflow->MaxFlows) {
822 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
823 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
824 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
825 #ifdef STD_NETFLOW_PDU
826 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
829 for (i = 0; i < npeers; i++) {
830 if (peers[i].type == PEER_FILE) {
831 if (netflow->SeqOffset)
832 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
833 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
834 ret = write(peers[i].write_fd, emit_packet, size);
837 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
838 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
839 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
843 #if ((DEBUG) & DEBUG_E)
845 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
846 emit_count, i + 1, peers[i].seq);
849 peers[i].seq += emit_count;
852 if (emit_rate_bytes) {
854 delay = sent / emit_rate_bytes;
856 sent %= emit_rate_bytes;
858 timeout.tv_nsec = emit_rate_delay * delay;
859 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
864 if (peers[i].type == PEER_MIRROR) goto sendreal;
866 if (peers[i].type == PEER_ROTATE)
867 if (peer_rot_cur++ == peer_rot_work) {
869 if (netflow->SeqOffset)
870 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
871 ret = send(peers[i].write_fd, emit_packet, size, 0);
873 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
874 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
875 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
878 #if ((DEBUG) & DEBUG_E)
880 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
881 emit_count, i + 1, peers[i].seq);
884 peers[i].seq += emit_count;
887 if (emit_rate_bytes) {
889 delay = sent / emit_rate_bytes;
891 sent %= emit_rate_bytes;
893 timeout.tv_nsec = emit_rate_delay * delay;
894 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
899 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
900 emit_sequence += emit_count;
902 #if ((DEBUG) & DEBUG_I)
909 void *unpending_thread()
912 struct timespec timeout;
913 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
920 pthread_mutex_lock(&unpending_mutex);
923 while (!(pending_tail->flags & FLOW_PENDING)) {
924 gettimeofday(&now, 0);
925 timeout.tv_sec = now.tv_sec + unpending_timeout;
926 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
929 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
932 if (put_into(pending_tail, COPY_INTO
933 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
937 #if ((DEBUG) & DEBUG_I)
938 pkts_lost_unpending++;
942 #if ((DEBUG) & DEBUG_U)
943 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
946 pending_tail->flags = 0;
947 pending_tail = pending_tail->next;
948 #if ((DEBUG) & DEBUG_I)
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
960 struct Flow *flow, **flowpp;
962 struct timespec timeout;
967 pthread_mutex_lock(&scan_mutex);
971 timeout.tv_sec = now.sec + scan_interval;
972 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
975 #if ((DEBUG) & DEBUG_S)
976 my_log(LOG_DEBUG, "S: %d", now.sec);
978 for (i = 0; i < 1 << HASH_BITS ; i++) {
979 pthread_mutex_lock(&flows_mutex[i]);
983 if (flow->flags & FLOW_FRAG) {
984 /* Process fragmented flow */
985 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
986 /* Fragmented flow expired - put it into special chain */
987 #if ((DEBUG) & DEBUG_I)
991 *flowpp = flow->next;
993 flow->flags &= ~FLOW_FRAG;
994 flow->next = scan_frag_dreg;
995 scan_frag_dreg = flow;
1000 /* Flow is not frgamented */
1001 if ((now.sec - flow->mtime.sec) > inactive_lifetime
1002 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1004 #if ((DEBUG) & DEBUG_S)
1005 my_log(LOG_DEBUG, "S: E %x", flow);
1007 #if ((DEBUG) & DEBUG_I)
1010 *flowpp = flow->next;
1011 pthread_mutex_lock(&emit_mutex);
1012 flow->next = flows_emit;
1014 #if ((DEBUG) & DEBUG_I)
1017 pthread_mutex_unlock(&emit_mutex);
1022 flowpp = &flow->next;
1025 pthread_mutex_unlock(&flows_mutex[i]);
1027 if (flows_emit) pthread_cond_signal(&emit_cond);
1029 while (scan_frag_dreg) {
1030 flow = scan_frag_dreg;
1031 scan_frag_dreg = flow->next;
1032 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1035 put_into(flow, MOVE_INTO
1036 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1040 #if ((DEBUG) & DEBUG_S)
1041 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1049 struct ulog_packet_msg *ulog_msg;
1053 int len, off_frag, psize;
1054 #if ((DEBUG) & DEBUG_C)
1062 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1064 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1067 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1069 #if ((DEBUG) & DEBUG_C)
1070 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1073 nl = (void *) &ulog_msg->payload;
1074 psize = ulog_msg->data_len;
1077 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1078 #if ((DEBUG) & DEBUG_C)
1079 strcat(logbuf, " U");
1080 my_log(LOG_DEBUG, "%s", logbuf);
1082 #if ((DEBUG) & DEBUG_I)
1088 if (pending_head->flags) {
1089 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1091 # if ((DEBUG) & DEBUG_C)
1096 "pending queue full:", "packet lost");
1098 #if ((DEBUG) & DEBUG_I)
1099 pkts_lost_capture++;
1104 #if ((DEBUG) & DEBUG_I)
1108 flow = pending_head;
1110 /* ?FIXME? Add sanity check for ip_len? */
1111 flow->size = ntohs(nl->ip_len);
1112 #if ((DEBUG) & DEBUG_I)
1113 size_total += flow->size;
1116 flow->sip = nl->ip_src;
1117 flow->dip = nl->ip_dst;
1118 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1119 if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1120 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
1122 flow->iif = snmp_index(ulog_msg->indev_name);
1123 flow->oif = snmp_index(ulog_msg->outdev_name);
1124 flow->proto = nl->ip_p;
1126 flow->tcp_flags = 0;
1130 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1131 if (ulog_msg->timestamp_sec) {
1132 flow->ctime.sec = ulog_msg->timestamp_sec;
1133 flow->ctime.usec = ulog_msg->timestamp_usec;
1134 } else gettime(&flow->ctime);
1135 flow->mtime = flow->ctime;
1137 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1140 Offset (from network layer) to transport layer header/IP data
1141 IOW IP header size ;-)
1144 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1146 off_tl = nl->ip_hl << 2;
1147 tl = (void *) nl + off_tl;
1149 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1150 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1152 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1153 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1155 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1156 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1157 #if ((DEBUG) & DEBUG_C)
1158 strcat(logbuf, " F");
1160 #if ((DEBUG) & DEBUG_I)
1161 pkts_total_fragmented++;
1163 flow->flags |= FLOW_FRAG;
1164 flow->id = nl->ip_id;
1166 if (!(ntohs(nl->ip_off) & IP_MF)) {
1167 /* Packet whith IP_MF contains information about whole datagram size */
1168 flow->flags |= FLOW_LASTFRAG;
1169 /* size = frag_offset*8 + data_size */
1170 flow->sizeP = off_frag + flow->sizeF;
1174 #if ((DEBUG) & DEBUG_C)
1175 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1176 strcat(logbuf, buf);
1177 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1178 strcat(logbuf, buf);
1182 Fortunately most interesting transport layer information fit
1183 into first 8 bytes of IP data field (minimal nonzero size).
1184 Thus we don't need actual packet reassembling to build whole
1185 transport layer data. We only check the fragment offset for
1186 zero value to find packet with this information.
1188 if (!off_frag && psize >= 8) {
1189 switch (flow->proto) {
1192 flow->sp = ((struct udphdr *)tl)->uh_sport;
1193 flow->dp = ((struct udphdr *)tl)->uh_dport;
1198 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1199 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1202 #ifdef ICMP_TRICK_CISCO
1204 flow->dp = *((int32_t *) tl);
1209 /* Unknown transport layer */
1210 #if ((DEBUG) & DEBUG_C)
1211 strcat(logbuf, " U");
1218 #if ((DEBUG) & DEBUG_C)
1219 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1220 strcat(logbuf, buf);
1222 flow->flags |= FLOW_TL;
1226 /* Check for tcp flags presence (including CWR and ECE). */
1227 if (flow->proto == IPPROTO_TCP
1229 && psize >= 16 - off_frag) {
1230 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1231 #if ((DEBUG) & DEBUG_C)
1232 sprintf(buf, " TCP:%x", flow->tcp_flags);
1233 strcat(logbuf, buf);
1237 #if ((DEBUG) & DEBUG_C)
1238 sprintf(buf, " => %x", (unsigned) flow);
1239 strcat(logbuf, buf);
1240 my_log(LOG_DEBUG, "%s", logbuf);
1243 #if ((DEBUG) & DEBUG_I)
1245 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1246 if (pending_queue_trace < pending_queue_trace_candidate)
1247 pending_queue_trace = pending_queue_trace_candidate;
1250 /* Flow complete - inform unpending_thread() about it */
1251 pending_head->flags |= FLOW_PENDING;
1252 pending_head = pending_head->next;
1254 pthread_cond_signal(&unpending_cond);
1260 /* Copied out of CoDemux */
1262 static int init_daemon() {
1266 pidfile = fopen(PIDFILE, "w");
1267 if (pidfile == NULL) {
1268 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1271 if ((pid = fork()) < 0) {
1273 my_log(LOG_ERR, "Could not fork!\n");
1276 else if (pid != 0) {
1277 /* i'm the parent, writing down the child pid */
1278 fprintf(pidfile, "%u\n", pid);
1283 /* close the pid file */
1286 /* routines for any daemon process
1287 1. create a new session
1288 2. change directory to the root
1289 3. change the file creation permission
1292 chdir("/var/local/fprobe");
1298 int main(int argc, char **argv)
1301 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1302 int c, i, write_fd, memory_limit = 0;
1303 struct addrinfo hints, *res;
1304 struct sockaddr_in saddr;
1305 pthread_attr_t tattr;
1306 struct sigaction sigact;
1307 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1308 struct timeval timeout;
1310 sched_min = sched_get_priority_min(SCHED);
1311 sched_max = sched_get_priority_max(SCHED);
1313 memset(&saddr, 0 , sizeof(saddr));
1314 memset(&hints, 0 , sizeof(hints));
1315 hints.ai_flags = AI_PASSIVE;
1316 hints.ai_family = AF_INET;
1317 hints.ai_socktype = SOCK_DGRAM;
1319 /* Process command line options */
1322 while ((c = my_getopt(argc, argv, parms)) != -1) {
1332 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1333 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1334 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1335 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1336 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1337 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1338 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1339 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1340 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1341 if (parms[nflag].count) {
1342 switch (atoi(parms[nflag].arg)) {
1344 netflow = &NetFlow1;
1351 netflow = &NetFlow7;
1355 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1359 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1360 if (parms[lflag].count) {
1361 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1364 sprintf(errpbuf, "[%s]", log_suffix);
1365 strcat(ident, errpbuf);
1368 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1369 if (log_suffix) *--log_suffix = ':';
1371 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1373 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1376 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1377 if (parms[qflag].count) {
1378 pending_queue_length = atoi(parms[qflag].arg);
1379 if (pending_queue_length < 1) {
1380 fprintf(stderr, "Illegal %s\n", "pending queue length");
1384 if (parms[rflag].count) {
1385 schedp.sched_priority = atoi(parms[rflag].arg);
1386 if (schedp.sched_priority
1387 && (schedp.sched_priority < sched_min
1388 || schedp.sched_priority > sched_max)) {
1389 fprintf(stderr, "Illegal %s\n", "realtime priority");
1393 if (parms[Bflag].count) {
1394 sockbufsize = atoi(parms[Bflag].arg) << 10;
1396 if (parms[bflag].count) {
1397 bulk_quantity = atoi(parms[bflag].arg);
1398 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1399 fprintf(stderr, "Illegal %s\n", "bulk size");
1403 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1404 if (parms[Xflag].count) {
1405 for(i = 0; parms[Xflag].arg[i]; i++)
1406 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1407 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1409 rule = strtok(parms[Xflag].arg, ":");
1410 for (i = 0; rule; i++) {
1411 snmp_rules[i].len = strlen(rule);
1412 if (snmp_rules[i].len > IFNAMSIZ) {
1413 fprintf(stderr, "Illegal %s\n", "interface basename");
1416 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1417 if (!*(rule - 1)) *(rule - 1) = ',';
1418 rule = strtok(NULL, ",");
1420 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1423 snmp_rules[i].base = atoi(rule);
1425 rule = strtok(NULL, ":");
1429 if (parms[tflag].count)
1430 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1431 if (parms[aflag].count) {
1432 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1434 fprintf(stderr, "Illegal %s\n", "source address");
1437 saddr = *((struct sockaddr_in *) res->ai_addr);
1441 if (parms[uflag].count)
1442 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1443 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1448 /* Process collectors parameters. Brrrr... :-[ */
1450 npeers = argc - optind;
1452 /* Send to remote Netflow collector */
1453 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1454 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1456 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1458 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1459 fprintf(stderr, "socket(): %s\n", strerror(errno));
1462 peers[npeers].write_fd = write_fd;
1463 peers[npeers].type = PEER_MIRROR;
1464 peers[npeers].laddr = saddr;
1465 peers[npeers].seq = 0;
1466 if ((lhost = strchr(dport, '/'))) {
1468 if ((type = strchr(lhost, '/'))) {
1476 peers[npeers].type = PEER_ROTATE;
1485 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1486 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1490 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1491 sizeof(struct sockaddr_in))) {
1492 fprintf(stderr, "bind(): %s\n", strerror(errno));
1495 if (getaddrinfo(dhost, dport, &hints, &res)) {
1497 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1500 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1502 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1503 sizeof(struct sockaddr_in))) {
1504 fprintf(stderr, "connect(): %s\n", strerror(errno));
1508 /* Restore command line */
1509 if (type) *--type = '/';
1510 if (lhost) *--lhost = '/';
1514 else if (parms[fflag].count) {
1516 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1517 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1518 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1520 peers[npeers].write_fd = START_VALUE;
1521 peers[npeers].type = PEER_FILE;
1522 peers[npeers].seq = 0;
1531 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1532 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1534 fprintf(stderr, "libipulog initialization error: %s",
1535 ipulog_strerror(ipulog_errno));
1539 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1540 &sockbufsize, sizeof(sockbufsize)) < 0)
1541 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1543 /* Daemonize (if log destination stdout-free) */
1545 my_log_open(ident, verbosity, log_dest);
1549 if (!(log_dest & 2)) {
1550 /* Crash-proofing - Sapan*/
1554 fprintf(stderr, "fork(): %s", strerror(errno));
1559 freopen("/dev/null", "r", stdin);
1560 freopen("/dev/null", "w", stdout);
1561 freopen("/dev/null", "w", stderr);
1565 while (wait3(NULL,0,NULL) < 1);
1569 setvbuf(stdout, (char *)0, _IONBF, 0);
1570 setvbuf(stderr, (char *)0, _IONBF, 0);
1574 sprintf(errpbuf, "[%ld]", (long) pid);
1575 strcat(ident, errpbuf);
1577 /* Initialization */
1579 hash_init(); /* Actually for crc16 only */
1580 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1581 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1584 /* Hope 12 days is enough :-/ */
1585 start_time_offset = 1 << 20;
1587 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1589 gettime(&start_time);
1592 Build static pending queue as circular buffer.
1594 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1595 pending_tail = pending_head;
1596 for (i = pending_queue_length - 1; i--;) {
1597 if (!(pending_tail->next = mem_alloc())) {
1599 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1602 pending_tail = pending_tail->next;
1604 pending_tail->next = pending_head;
1605 pending_tail = pending_head;
1607 sigemptyset(&sig_mask);
1608 sigact.sa_handler = &sighandler;
1609 sigact.sa_mask = sig_mask;
1610 sigact.sa_flags = 0;
1611 sigaddset(&sig_mask, SIGTERM);
1612 sigaction(SIGTERM, &sigact, 0);
1613 #if ((DEBUG) & DEBUG_I)
1614 sigaddset(&sig_mask, SIGUSR1);
1615 sigaction(SIGUSR1, &sigact, 0);
1617 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1618 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1622 my_log(LOG_INFO, "Starting %s...", VERSION);
1624 if (parms[cflag].count) {
1625 if (chdir(parms[cflag].arg) || chroot(".")) {
1626 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1631 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1632 pthread_attr_init(&tattr);
1633 for (i = 0; i < THREADS - 1; i++) {
1634 if (schedp.sched_priority > 0) {
1635 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1636 (pthread_attr_setschedparam(&tattr, &schedp))) {
1637 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1641 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1642 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1645 pthread_detach(thid);
1646 schedp.sched_priority++;
1650 if (setgroups(0, NULL)) {
1651 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1654 if (setregid(pw->pw_gid, pw->pw_gid)) {
1655 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1658 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1659 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1664 if (!(pidfile = fopen(pidfilepath, "w")))
1665 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1667 fprintf(pidfile, "%ld\n", (long) pid);
1671 my_log(LOG_INFO, "pid: %d", pid);
1672 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1673 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1674 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1675 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1676 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1677 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1678 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1679 for (i = 0; i < nsnmp_rules; i++) {
1680 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1681 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1683 for (i = 0; i < npeers; i++) {
1684 switch (peers[i].type) {
1692 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1693 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1694 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1697 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1699 timeout.tv_usec = 0;
1701 || (total_elements - free_elements - pending_queue_length)
1703 || pending_tail->flags) {
1706 timeout.tv_sec = scan_interval;
1707 select(0, 0, 0, 0, &timeout);
1710 if (sigs & SIGTERM_MASK && !killed) {
1711 sigs &= ~SIGTERM_MASK;
1712 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1715 active_lifetime = -1;
1716 inactive_lifetime = -1;
1718 unpending_timeout = 1;
1720 pthread_cond_signal(&scan_cond);
1721 pthread_cond_signal(&unpending_cond);
1724 #if ((DEBUG) & DEBUG_I)
1725 if (sigs & SIGUSR1_MASK) {
1726 sigs &= ~SIGUSR1_MASK;
1731 remove(pidfilepath);
1732 #if ((DEBUG) & DEBUG_I)
1735 my_log(LOG_INFO, "Done.");