2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 7/11/2007 Sapan Bhatia <sapanb@cs.princeton.edu>
11 Added data collection (-f) functionality, xid support in the header and log file
17 /* stdout, stderr, freopen() */
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
32 #include <libipulog/libipulog.h>
33 struct ipulog_handle {
36 struct sockaddr_nl local;
37 struct sockaddr_nl peer;
38 struct nlmsghdr* last_nlhdr;
41 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
42 #include <sys/types.h>
43 #include <netinet/in_systm.h>
44 #include <sys/socket.h>
45 #include <netinet/in.h>
46 #include <arpa/inet.h>
47 #include <netinet/ip.h>
48 #include <netinet/tcp.h>
49 #include <netinet/udp.h>
50 #include <netinet/ip_icmp.h>
53 #include <sys/param.h>
78 #include <sys/select.h>
84 #include <fprobe-ulog.h>
86 #include <my_getopt.h>
118 static struct getopt_parms parms[] = {
119 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
120 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
121 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
122 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
123 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
124 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 extern int optind, opterr, optopt;
149 extern struct NetFlow NetFlow1;
150 extern struct NetFlow NetFlow5;
151 extern struct NetFlow NetFlow7;
153 #define mark_is_tos parms[Mflag].count
154 static unsigned scan_interval = 5;
155 static int frag_lifetime = 30;
156 static int inactive_lifetime = 60;
157 static int active_lifetime = 300;
158 static int sockbufsize;
159 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
160 #if (MEM_BITS == 0) || (MEM_BITS == 16)
161 #define BULK_QUANTITY 10000
163 #define BULK_QUANTITY 200
166 static unsigned epoch_length=60, log_epochs=1;
167 static unsigned cur_epoch=0,prev_uptime=0;
169 static unsigned bulk_quantity = BULK_QUANTITY;
170 static unsigned pending_queue_length = 100;
171 static struct NetFlow *netflow = &NetFlow5;
172 static unsigned verbosity = 6;
173 static unsigned log_dest = MY_LOG_SYSLOG;
174 static struct Time start_time;
175 static long start_time_offset;
178 extern unsigned total_elements;
179 extern unsigned free_elements;
180 extern unsigned total_memory;
181 #if ((DEBUG) & DEBUG_I)
182 static unsigned emit_pkts, emit_queue;
183 static uint64_t size_total;
184 static unsigned pkts_total, pkts_total_fragmented;
185 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
186 static unsigned pkts_pending, pkts_pending_done;
187 static unsigned pending_queue_trace, pending_queue_trace_candidate;
188 static unsigned flows_total, flows_fragmented;
190 static unsigned emit_count;
191 static uint32_t emit_sequence;
192 static unsigned emit_rate_bytes, emit_rate_delay;
193 static struct Time emit_time;
194 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
195 static pthread_t thid;
196 static sigset_t sig_mask;
197 static struct sched_param schedp;
198 static int sched_min, sched_max;
199 static int npeers, npeers_rot;
200 static struct peer *peers;
203 static struct Flow *flows[1 << HASH_BITS];
204 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
206 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
207 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
209 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
210 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
211 static struct Flow *pending_head, *pending_tail;
212 static struct Flow *scan_frag_dreg;
214 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
215 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
216 static struct Flow *flows_emit;
218 static char ident[256] = "fprobe-ulog";
219 static FILE *pidfile;
220 static char *pidfilepath;
223 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
224 static struct ipulog_handle *ulog_handle;
225 static uint32_t ulog_gmask = 1;
226 static char *cap_buf;
227 static int nsnmp_rules;
228 static struct snmp_rule *snmp_rules;
229 static struct passwd *pw = 0;
234 "fprobe-ulog: a NetFlow probe. Version %s\n"
235 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
237 "-h\t\tDisplay this help\n"
238 "-U <mask>\tULOG group bitwise mask [1]\n"
239 "-s <seconds>\tHow often scan for expired flows [5]\n"
240 "-g <seconds>\tFragmented flow lifetime [30]\n"
241 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
242 "-f <filename>\tLog flow data in a file\n"
243 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
244 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
245 "-a <address>\tUse <address> as source for NetFlow flow\n"
246 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
247 "-M\t\tUse netfilter mark value as ToS flag\n"
248 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
249 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
250 "-q <flows>\tPending queue length [100]\n"
251 "-B <kilobytes>\tKernel capture buffer size [0]\n"
252 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
253 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
254 "-c <directory>\tDirectory to chroot to\n"
255 "-u <user>\tUser to run as\n"
256 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
257 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
258 "-y <remote:port>\tAddress of the NetFlow collector\n"
259 "-f <writable file>\tFile to write data into\n"
260 "-T <n>\tRotate log file every n epochs\n"
261 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
262 "-E <[1..60]>\tSize of an epoch in minutes\n"
264 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
268 #if ((DEBUG) & DEBUG_I)
271 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
272 pkts_total, pkts_total_fragmented, size_total,
273 pkts_pending - pkts_pending_done, pending_queue_trace);
274 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
275 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
276 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
277 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
278 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
279 total_elements, free_elements, total_memory);
283 void sighandler(int sig)
287 sigs |= SIGTERM_MASK;
289 #if ((DEBUG) & DEBUG_I)
291 sigs |= SIGUSR1_MASK;
297 void gettime(struct Time *now)
303 now->usec = t.tv_usec;
307 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
309 return (t1->sec - t2->sec)/60;
312 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
314 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
317 /* Uptime in miliseconds */
318 uint32_t getuptime(struct Time *t)
320 /* Maximum uptime is about 49/2 days */
321 return cmpmtime(t, &start_time);
324 /* Uptime in minutes */
325 uint32_t getuptime_minutes(struct Time *t)
327 /* Maximum uptime is about 49/2 days */
328 return cmpMtime(t, &start_time);
331 hash_t hash_flow(struct Flow *flow)
333 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
334 else return hash(flow, sizeof(struct Flow_TL));
337 uint16_t snmp_index(char *name) {
340 if (!*name) return 0;
342 for (i = 0; (int) i < nsnmp_rules; i++) {
343 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
344 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
347 if ((i = if_nametoindex(name))) return i;
352 inline void copy_flow(struct Flow *src, struct Flow *dst)
359 dst->proto = src->proto;
360 dst->tcp_flags = src->tcp_flags;
364 dst->pkts = src->pkts;
365 dst->size = src->size;
366 dst->sizeF = src->sizeF;
367 dst->sizeP = src->sizeP;
368 dst->ctime = src->ctime;
369 dst->mtime = src->mtime;
370 dst->flags = src->flags;
373 void update_cur_epoch_file(int n) {
376 len=snprintf(snum,6,"%d",n);
377 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT);
379 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch");
382 write(fd, snum, len);
386 unsigned get_log_fd(char *fname, unsigned cur_fd) {
391 cur_uptime = getuptime_minutes(&now);
393 /* Epoch length in minutes */
394 if ((cur_uptime - prev_uptime) > epoch_length || cur_fd==-1) {
395 char nextname[MAX_PATH_LEN];
397 prev_uptime = cur_uptime;
398 cur_epoch = (cur_epoch + 1) % log_epochs;
400 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
401 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
402 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
405 update_cur_epoch_file(cur_epoch);
413 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
415 struct Flow **flowpp;
421 if (prev) flowpp = *prev;
424 if (where->sip.s_addr == what->sip.s_addr
425 && where->dip.s_addr == what->dip.s_addr
426 && where->proto == what->proto) {
427 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
429 /* Both unfragmented */
430 if ((what->sp == where->sp)
431 && (what->dp == where->dp)) goto done;
434 /* Both fragmented */
435 if (where->id == what->id) goto done;
439 flowpp = &where->next;
443 if (prev) *prev = flowpp;
447 int put_into(struct Flow *flow, int flag
448 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
455 struct Flow *flown, **flowpp;
456 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
461 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
462 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
465 pthread_mutex_lock(&flows_mutex[h]);
467 if (!(flown = find(flows[h], flow, &flowpp))) {
468 /* No suitable flow found - add */
469 if (flag == COPY_INTO) {
470 if ((flown = mem_alloc())) {
471 copy_flow(flow, flown);
474 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
475 my_log(LOG_ERR, "%s %s. %s",
476 "mem_alloc():", strerror(errno), "packet lost");
481 flow->next = flows[h];
483 #if ((DEBUG) & DEBUG_I)
485 if (flow->flags & FLOW_FRAG) flows_fragmented++;
487 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
489 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
494 /* Found suitable flow - update */
495 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
496 sprintf(buf, " +> %x", (unsigned) flown);
499 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
500 flown->mtime = flow->mtime;
501 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
502 flown->ctime = flow->ctime;
503 flown->tcp_flags |= flow->tcp_flags;
504 flown->size += flow->size;
505 flown->pkts += flow->pkts;
506 if (flow->flags & FLOW_FRAG) {
507 /* Fragmented flow require some additional work */
508 if (flow->flags & FLOW_TL) {
511 Several packets with FLOW_TL (attack)
513 flown->sp = flow->sp;
514 flown->dp = flow->dp;
516 if (flow->flags & FLOW_LASTFRAG) {
519 Several packets with FLOW_LASTFRAG (attack)
521 flown->sizeP = flow->sizeP;
523 flown->flags |= flow->flags;
524 flown->sizeF += flow->sizeF;
525 if ((flown->flags & FLOW_LASTFRAG)
526 && (flown->sizeF >= flown->sizeP)) {
527 /* All fragments received - flow reassembled */
528 *flowpp = flown->next;
529 pthread_mutex_unlock(&flows_mutex[h]);
530 #if ((DEBUG) & DEBUG_I)
535 flown->flags &= ~FLOW_FRAG;
536 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
539 ret = put_into(flown, MOVE_INTO
540 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
546 if (flag == MOVE_INTO) mem_free(flow);
548 pthread_mutex_unlock(&flows_mutex[h]);
552 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
556 for (i = 0; i < fields; i++) {
557 #if ((DEBUG) & DEBUG_F)
558 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
561 case NETFLOW_IPV4_SRC_ADDR:
562 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
563 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
566 case NETFLOW_IPV4_DST_ADDR:
567 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
568 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
569 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
571 p += NETFLOW_IPV4_DST_ADDR_SIZE;
574 case NETFLOW_INPUT_SNMP:
575 *((uint16_t *) p) = htons(flow->iif);
576 p += NETFLOW_INPUT_SNMP_SIZE;
579 case NETFLOW_OUTPUT_SNMP:
580 *((uint16_t *) p) = htons(flow->oif);
581 p += NETFLOW_OUTPUT_SNMP_SIZE;
584 case NETFLOW_PKTS_32:
585 *((uint32_t *) p) = htonl(flow->pkts);
586 p += NETFLOW_PKTS_32_SIZE;
589 case NETFLOW_BYTES_32:
590 *((uint32_t *) p) = htonl(flow->size);
591 p += NETFLOW_BYTES_32_SIZE;
594 case NETFLOW_FIRST_SWITCHED:
595 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
596 p += NETFLOW_FIRST_SWITCHED_SIZE;
599 case NETFLOW_LAST_SWITCHED:
600 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
601 p += NETFLOW_LAST_SWITCHED_SIZE;
604 case NETFLOW_L4_SRC_PORT:
605 *((uint16_t *) p) = flow->sp;
606 p += NETFLOW_L4_SRC_PORT_SIZE;
609 case NETFLOW_L4_DST_PORT:
610 *((uint16_t *) p) = flow->dp;
611 p += NETFLOW_L4_DST_PORT_SIZE;
615 *((uint8_t *) p) = flow->proto;
616 p += NETFLOW_PROT_SIZE;
619 case NETFLOW_SRC_TOS:
620 *((uint8_t *) p) = flow->tos;
621 p += NETFLOW_SRC_TOS_SIZE;
624 case NETFLOW_TCP_FLAGS:
625 *((uint8_t *) p) = flow->tcp_flags;
626 p += NETFLOW_TCP_FLAGS_SIZE;
629 case NETFLOW_VERSION:
630 *((uint16_t *) p) = htons(netflow->Version);
631 p += NETFLOW_VERSION_SIZE;
635 *((uint16_t *) p) = htons(emit_count);
636 p += NETFLOW_COUNT_SIZE;
640 *((uint32_t *) p) = htonl(getuptime(&emit_time));
641 p += NETFLOW_UPTIME_SIZE;
644 case NETFLOW_UNIX_SECS:
645 *((uint32_t *) p) = htonl(emit_time.sec);
646 p += NETFLOW_UNIX_SECS_SIZE;
649 case NETFLOW_UNIX_NSECS:
650 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
651 p += NETFLOW_UNIX_NSECS_SIZE;
654 case NETFLOW_FLOW_SEQUENCE:
655 //*((uint32_t *) p) = htonl(emit_sequence);
656 *((uint32_t *) p) = 0;
657 p += NETFLOW_FLOW_SEQUENCE_SIZE;
661 /* Unsupported (uint8_t) */
662 case NETFLOW_ENGINE_TYPE:
663 case NETFLOW_ENGINE_ID:
664 case NETFLOW_FLAGS7_1:
665 case NETFLOW_SRC_MASK:
666 case NETFLOW_DST_MASK:
667 *((uint8_t *) p) = 0;
668 p += NETFLOW_PAD8_SIZE;
671 *((uint16_t *) p) = flow->tos;
672 p += NETFLOW_XID_SIZE;
675 /* Unsupported (uint16_t) */
678 case NETFLOW_FLAGS7_2:
679 *((uint16_t *) p) = 0;
680 p += NETFLOW_PAD16_SIZE;
684 /* Unsupported (uint32_t) */
685 case NETFLOW_IPV4_NEXT_HOP:
686 case NETFLOW_ROUTER_SC:
687 *((uint32_t *) p) = 0;
688 p += NETFLOW_PAD32_SIZE;
692 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
693 format, i, format[i]);
697 #if ((DEBUG) & DEBUG_F)
698 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
705 Workaround for clone()-based threads
706 Try to change EUID independently of main thread
710 setregid(pw->pw_gid, pw->pw_gid);
711 setreuid(pw->pw_uid, pw->pw_uid);
720 struct timespec timeout;
721 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
723 p = (void *) &emit_packet + netflow->HeaderSize;
729 pthread_mutex_lock(&emit_mutex);
730 while (!flows_emit) {
731 gettimeofday(&now, 0);
732 timeout.tv_sec = now.tv_sec + emit_timeout;
733 /* Do not wait until emit_packet will filled - it may be too long */
734 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
735 pthread_mutex_unlock(&emit_mutex);
740 flows_emit = flows_emit->next;
741 #if ((DEBUG) & DEBUG_I)
744 pthread_mutex_unlock(&emit_mutex);
748 gettime(&start_time);
749 start_time.sec -= start_time_offset;
752 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
756 printf("Emit count = %d\n", emit_count);
759 if (emit_count == netflow->MaxFlows) {
762 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
763 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
764 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
765 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
767 for (i = 0; i < npeers; i++) {
768 if (peers[i].type == PEER_FILE) {
769 if (netflow->SeqOffset)
770 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
771 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
772 ret = write(peers[i].write_fd, emit_packet, size);
775 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
776 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
777 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
781 #if ((DEBUG) & DEBUG_E)
783 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
784 emit_count, i + 1, peers[i].seq);
787 peers[i].seq += emit_count;
790 if (emit_rate_bytes) {
792 delay = sent / emit_rate_bytes;
794 sent %= emit_rate_bytes;
796 timeout.tv_nsec = emit_rate_delay * delay;
797 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
802 if (peers[i].type == PEER_MIRROR) goto sendreal;
804 if (peers[i].type == PEER_ROTATE)
805 if (peer_rot_cur++ == peer_rot_work) {
807 if (netflow->SeqOffset)
808 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
809 ret = send(peers[i].write_fd, emit_packet, size, 0);
811 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
812 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
813 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
816 #if ((DEBUG) & DEBUG_E)
818 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
819 emit_count, i + 1, peers[i].seq);
822 peers[i].seq += emit_count;
825 if (emit_rate_bytes) {
827 delay = sent / emit_rate_bytes;
829 sent %= emit_rate_bytes;
831 timeout.tv_nsec = emit_rate_delay * delay;
832 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
837 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
838 emit_sequence += emit_count;
840 #if ((DEBUG) & DEBUG_I)
847 void *unpending_thread()
850 struct timespec timeout;
851 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
858 pthread_mutex_lock(&unpending_mutex);
861 while (!(pending_tail->flags & FLOW_PENDING)) {
862 gettimeofday(&now, 0);
863 timeout.tv_sec = now.tv_sec + unpending_timeout;
864 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
867 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
870 if (put_into(pending_tail, COPY_INTO
871 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
875 #if ((DEBUG) & DEBUG_I)
876 pkts_lost_unpending++;
880 #if ((DEBUG) & DEBUG_U)
881 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
884 pending_tail->flags = 0;
885 pending_tail = pending_tail->next;
886 #if ((DEBUG) & DEBUG_I)
894 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
898 struct Flow *flow, **flowpp;
900 struct timespec timeout;
905 pthread_mutex_lock(&scan_mutex);
909 timeout.tv_sec = now.sec + scan_interval;
910 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
913 #if ((DEBUG) & DEBUG_S)
914 my_log(LOG_DEBUG, "S: %d", now.sec);
916 for (i = 0; i < 1 << HASH_BITS ; i++) {
917 pthread_mutex_lock(&flows_mutex[i]);
921 if (flow->flags & FLOW_FRAG) {
922 /* Process fragmented flow */
923 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
924 /* Fragmented flow expired - put it into special chain */
925 #if ((DEBUG) & DEBUG_I)
929 *flowpp = flow->next;
931 flow->flags &= ~FLOW_FRAG;
932 flow->next = scan_frag_dreg;
933 scan_frag_dreg = flow;
938 /* Flow is not frgamented */
939 if ((now.sec - flow->mtime.sec) > inactive_lifetime
940 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
942 #if ((DEBUG) & DEBUG_S)
943 my_log(LOG_DEBUG, "S: E %x", flow);
945 #if ((DEBUG) & DEBUG_I)
948 *flowpp = flow->next;
949 pthread_mutex_lock(&emit_mutex);
950 flow->next = flows_emit;
952 #if ((DEBUG) & DEBUG_I)
955 pthread_mutex_unlock(&emit_mutex);
960 flowpp = &flow->next;
963 pthread_mutex_unlock(&flows_mutex[i]);
965 if (flows_emit) pthread_cond_signal(&emit_cond);
967 while (scan_frag_dreg) {
968 flow = scan_frag_dreg;
969 scan_frag_dreg = flow->next;
970 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
973 put_into(flow, MOVE_INTO
974 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
978 #if ((DEBUG) & DEBUG_S)
979 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
987 struct ulog_packet_msg *ulog_msg;
991 int len, off_frag, psize;
992 #if ((DEBUG) & DEBUG_C)
1000 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1002 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1005 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1007 #if ((DEBUG) & DEBUG_C)
1008 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1011 nl = (void *) &ulog_msg->payload;
1012 psize = ulog_msg->data_len;
1015 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1016 #if ((DEBUG) & DEBUG_C)
1017 strcat(logbuf, " U");
1018 my_log(LOG_DEBUG, "%s", logbuf);
1020 #if ((DEBUG) & DEBUG_I)
1026 if (pending_head->flags) {
1027 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1029 # if ((DEBUG) & DEBUG_C)
1034 "pending queue full:", "packet lost");
1036 #if ((DEBUG) & DEBUG_I)
1037 pkts_lost_capture++;
1042 #if ((DEBUG) & DEBUG_I)
1046 flow = pending_head;
1048 /* ?FIXME? Add sanity check for ip_len? */
1049 flow->size = ntohs(nl->ip_len);
1050 #if ((DEBUG) & DEBUG_I)
1051 size_total += flow->size;
1054 flow->sip = nl->ip_src;
1055 flow->dip = nl->ip_dst;
1056 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1057 my_log(LOG_INFO, "Received test flow to corewars.org");
1059 flow->iif = snmp_index(ulog_msg->indev_name);
1060 flow->oif = snmp_index(ulog_msg->outdev_name);
1061 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1062 flow->proto = nl->ip_p;
1064 flow->tcp_flags = 0;
1068 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1069 if (ulog_msg->timestamp_sec) {
1070 flow->ctime.sec = ulog_msg->timestamp_sec;
1071 flow->ctime.usec = ulog_msg->timestamp_usec;
1072 } else gettime(&flow->ctime);
1073 flow->mtime = flow->ctime;
1075 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1078 Offset (from network layer) to transport layer header/IP data
1079 IOW IP header size ;-)
1082 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1084 off_tl = nl->ip_hl << 2;
1085 tl = (void *) nl + off_tl;
1087 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1088 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1090 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1091 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1093 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1094 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1095 #if ((DEBUG) & DEBUG_C)
1096 strcat(logbuf, " F");
1098 #if ((DEBUG) & DEBUG_I)
1099 pkts_total_fragmented++;
1101 flow->flags |= FLOW_FRAG;
1102 flow->id = nl->ip_id;
1104 if (!(ntohs(nl->ip_off) & IP_MF)) {
1105 /* Packet whith IP_MF contains information about whole datagram size */
1106 flow->flags |= FLOW_LASTFRAG;
1107 /* size = frag_offset*8 + data_size */
1108 flow->sizeP = off_frag + flow->sizeF;
1112 #if ((DEBUG) & DEBUG_C)
1113 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1114 strcat(logbuf, buf);
1115 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1116 strcat(logbuf, buf);
1120 Fortunately most interesting transport layer information fit
1121 into first 8 bytes of IP data field (minimal nonzero size).
1122 Thus we don't need actual packet reassembling to build whole
1123 transport layer data. We only check the fragment offset for
1124 zero value to find packet with this information.
1126 if (!off_frag && psize >= 8) {
1127 switch (flow->proto) {
1130 flow->sp = ((struct udphdr *)tl)->uh_sport;
1131 flow->dp = ((struct udphdr *)tl)->uh_dport;
1136 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1137 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1140 #ifdef ICMP_TRICK_CISCO
1142 flow->dp = *((int32_t *) tl);
1147 /* Unknown transport layer */
1148 #if ((DEBUG) & DEBUG_C)
1149 strcat(logbuf, " U");
1156 #if ((DEBUG) & DEBUG_C)
1157 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1158 strcat(logbuf, buf);
1160 flow->flags |= FLOW_TL;
1164 /* Check for tcp flags presence (including CWR and ECE). */
1165 if (flow->proto == IPPROTO_TCP
1167 && psize >= 16 - off_frag) {
1168 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1169 #if ((DEBUG) & DEBUG_C)
1170 sprintf(buf, " TCP:%x", flow->tcp_flags);
1171 strcat(logbuf, buf);
1175 #if ((DEBUG) & DEBUG_C)
1176 sprintf(buf, " => %x", (unsigned) flow);
1177 strcat(logbuf, buf);
1178 my_log(LOG_DEBUG, "%s", logbuf);
1181 #if ((DEBUG) & DEBUG_I)
1183 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1184 if (pending_queue_trace < pending_queue_trace_candidate)
1185 pending_queue_trace = pending_queue_trace_candidate;
1188 /* Flow complete - inform unpending_thread() about it */
1189 pending_head->flags |= FLOW_PENDING;
1190 pending_head = pending_head->next;
1192 pthread_cond_signal(&unpending_cond);
1198 int main(int argc, char **argv)
1201 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1202 int c, i, write_fd, memory_limit = 0;
1203 struct addrinfo hints, *res;
1204 struct sockaddr_in saddr;
1205 pthread_attr_t tattr;
1206 struct sigaction sigact;
1207 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1208 struct timeval timeout;
1210 sched_min = sched_get_priority_min(SCHED);
1211 sched_max = sched_get_priority_max(SCHED);
1213 memset(&saddr, 0 , sizeof(saddr));
1214 memset(&hints, 0 , sizeof(hints));
1215 hints.ai_flags = AI_PASSIVE;
1216 hints.ai_family = AF_INET;
1217 hints.ai_socktype = SOCK_DGRAM;
1219 /* Process command line options */
1222 while ((c = my_getopt(argc, argv, parms)) != -1) {
1232 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1233 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1234 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1235 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1236 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1237 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1238 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1239 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1240 if (parms[nflag].count) {
1241 switch (atoi(parms[nflag].arg)) {
1243 netflow = &NetFlow1;
1250 netflow = &NetFlow7;
1254 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1258 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1259 if (parms[lflag].count) {
1260 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1263 sprintf(errpbuf, "[%s]", log_suffix);
1264 strcat(ident, errpbuf);
1267 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1268 if (log_suffix) *--log_suffix = ':';
1270 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1272 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1275 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1276 if (parms[qflag].count) {
1277 pending_queue_length = atoi(parms[qflag].arg);
1278 if (pending_queue_length < 1) {
1279 fprintf(stderr, "Illegal %s\n", "pending queue length");
1283 if (parms[rflag].count) {
1284 schedp.sched_priority = atoi(parms[rflag].arg);
1285 if (schedp.sched_priority
1286 && (schedp.sched_priority < sched_min
1287 || schedp.sched_priority > sched_max)) {
1288 fprintf(stderr, "Illegal %s\n", "realtime priority");
1292 if (parms[Bflag].count) {
1293 sockbufsize = atoi(parms[Bflag].arg) << 10;
1295 if (parms[bflag].count) {
1296 bulk_quantity = atoi(parms[bflag].arg);
1297 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1298 fprintf(stderr, "Illegal %s\n", "bulk size");
1302 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1303 if (parms[Xflag].count) {
1304 for(i = 0; parms[Xflag].arg[i]; i++)
1305 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1306 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1308 rule = strtok(parms[Xflag].arg, ":");
1309 for (i = 0; rule; i++) {
1310 snmp_rules[i].len = strlen(rule);
1311 if (snmp_rules[i].len > IFNAMSIZ) {
1312 fprintf(stderr, "Illegal %s\n", "interface basename");
1315 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1316 if (!*(rule - 1)) *(rule - 1) = ',';
1317 rule = strtok(NULL, ",");
1319 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1322 snmp_rules[i].base = atoi(rule);
1324 rule = strtok(NULL, ":");
1328 if (parms[tflag].count)
1329 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1330 if (parms[aflag].count) {
1331 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1333 fprintf(stderr, "Illegal %s\n", "source address");
1336 saddr = *((struct sockaddr_in *) res->ai_addr);
1340 if (parms[uflag].count)
1341 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1342 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1347 /* Process collectors parameters. Brrrr... :-[ */
1349 npeers = argc - optind;
1351 /* Send to remote Netflow collector */
1352 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1353 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1355 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1357 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1358 fprintf(stderr, "socket(): %s\n", strerror(errno));
1361 peers[npeers].write_fd = write_fd;
1362 peers[npeers].type = PEER_MIRROR;
1363 peers[npeers].laddr = saddr;
1364 peers[npeers].seq = 0;
1365 if ((lhost = strchr(dport, '/'))) {
1367 if ((type = strchr(lhost, '/'))) {
1375 peers[npeers].type = PEER_ROTATE;
1384 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1385 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1389 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1390 sizeof(struct sockaddr_in))) {
1391 fprintf(stderr, "bind(): %s\n", strerror(errno));
1394 if (getaddrinfo(dhost, dport, &hints, &res)) {
1396 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1399 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1401 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1402 sizeof(struct sockaddr_in))) {
1403 fprintf(stderr, "connect(): %s\n", strerror(errno));
1407 /* Restore command line */
1408 if (type) *--type = '/';
1409 if (lhost) *--lhost = '/';
1413 else if (parms[fflag].count) {
1415 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1416 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1417 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1419 peers[npeers].write_fd = -1;
1420 peers[npeers].type = PEER_FILE;
1421 peers[npeers].seq = 0;
1428 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1429 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1431 fprintf(stderr, "libipulog initialization error: %s",
1432 ipulog_strerror(ipulog_errno));
1436 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1437 &sockbufsize, sizeof(sockbufsize)) < 0)
1438 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1440 /* Daemonize (if log destination stdout-free) */
1442 my_log_open(ident, verbosity, log_dest);
1443 if (!(log_dest & 2)) {
1446 fprintf(stderr, "fork(): %s", strerror(errno));
1451 freopen("/dev/null", "r", stdin);
1452 freopen("/dev/null", "w", stdout);
1453 freopen("/dev/null", "w", stderr);
1460 setvbuf(stdout, (char *)0, _IONBF, 0);
1461 setvbuf(stderr, (char *)0, _IONBF, 0);
1465 sprintf(errpbuf, "[%ld]", (long) pid);
1466 strcat(ident, errpbuf);
1468 /* Initialization */
1470 hash_init(); /* Actually for crc16 only */
1471 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1472 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1475 /* Hope 12 days is enough :-/ */
1476 start_time_offset = 1 << 20;
1478 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1480 gettime(&start_time);
1483 Build static pending queue as circular buffer.
1485 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1486 pending_tail = pending_head;
1487 for (i = pending_queue_length - 1; i--;) {
1488 if (!(pending_tail->next = mem_alloc())) {
1490 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1493 pending_tail = pending_tail->next;
1495 pending_tail->next = pending_head;
1496 pending_tail = pending_head;
1498 sigemptyset(&sig_mask);
1499 sigact.sa_handler = &sighandler;
1500 sigact.sa_mask = sig_mask;
1501 sigact.sa_flags = 0;
1502 sigaddset(&sig_mask, SIGTERM);
1503 sigaction(SIGTERM, &sigact, 0);
1504 #if ((DEBUG) & DEBUG_I)
1505 sigaddset(&sig_mask, SIGUSR1);
1506 sigaction(SIGUSR1, &sigact, 0);
1508 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1509 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1513 my_log(LOG_INFO, "Starting %s...", VERSION);
1515 if (parms[cflag].count) {
1516 if (chdir(parms[cflag].arg) || chroot(".")) {
1517 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1522 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1523 pthread_attr_init(&tattr);
1524 for (i = 0; i < THREADS - 1; i++) {
1525 if (schedp.sched_priority > 0) {
1526 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1527 (pthread_attr_setschedparam(&tattr, &schedp))) {
1528 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1532 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1533 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1536 pthread_detach(thid);
1537 schedp.sched_priority++;
1541 if (setgroups(0, NULL)) {
1542 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1545 if (setregid(pw->pw_gid, pw->pw_gid)) {
1546 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1549 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1550 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1555 if (!(pidfile = fopen(pidfilepath, "w")))
1556 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1558 fprintf(pidfile, "%ld\n", (long) pid);
1562 my_log(LOG_INFO, "pid: %d", pid);
1563 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1564 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1565 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1566 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1567 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1568 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1569 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1570 for (i = 0; i < nsnmp_rules; i++) {
1571 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1572 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1574 for (i = 0; i < npeers; i++) {
1575 switch (peers[i].type) {
1583 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1584 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1585 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1588 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1590 timeout.tv_usec = 0;
1592 || (total_elements - free_elements - pending_queue_length)
1594 || pending_tail->flags) {
1597 timeout.tv_sec = scan_interval;
1598 select(0, 0, 0, 0, &timeout);
1601 if (sigs & SIGTERM_MASK && !killed) {
1602 sigs &= ~SIGTERM_MASK;
1603 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1606 active_lifetime = -1;
1607 inactive_lifetime = -1;
1609 unpending_timeout = 1;
1611 pthread_cond_signal(&scan_cond);
1612 pthread_cond_signal(&unpending_cond);
1615 #if ((DEBUG) & DEBUG_I)
1616 if (sigs & SIGUSR1_MASK) {
1617 sigs &= ~SIGUSR1_MASK;
1622 remove(pidfilepath);
1623 #if ((DEBUG) & DEBUG_I)
1626 my_log(LOG_INFO, "Done.");