Re-import of fprobe-ulog
[fprobe-ulog.git] / trunk / src / fprobe-ulog.c.old
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8 */
9
10 #include <common.h>
11
12 /* stdout, stderr, freopen() */
13 #include <stdio.h>
14
15 /* atoi(), exit() */
16 #include <stdlib.h>
17
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
19 #include <unistd.h>
20
21 /* strerror() */
22 #include <string.h>
23
24 /* sig*() */
25 #include <signal.h>
26
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
29         int fd;
30         u_int8_t blocking;
31         struct sockaddr_nl local;
32         struct sockaddr_nl peer;
33         struct nlmsghdr* last_nlhdr;
34 };
35
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
46 #include <net/if.h>
47
48 #include <sys/param.h>
49 #include <pwd.h>
50 #ifdef OS_LINUX
51 #include <grp.h>
52 #endif
53
54 /* pthread_*() */
55 #include <pthread.h>
56
57 /* errno */
58 #include <errno.h>
59
60 /* getaddrinfo() */
61 #include <netdb.h>
62
63 /* nanosleep() */
64 #include <time.h>
65
66 /* gettimeofday() */
67 #include <sys/time.h>
68
69 /* scheduling */
70 #include <sched.h>
71
72 /* select() (POSIX)*/
73 #include <sys/select.h>
74
75 /* open() */
76 #include <sys/stat.h>
77 #include <fcntl.h>
78
79 #include <fprobe-ulog.h>
80 #include <my_log.h>
81 #include <my_getopt.h>
82 #include <netflow.h>
83 #include <hash.h>
84 #include <mem.h>
85
86 enum {
87         aflag,
88         Bflag,
89         bflag,
90         cflag,
91         dflag,
92         eflag,
93         gflag,
94         hflag,
95         lflag,
96         mflag,
97         Mflag,
98         nflag,
99         qflag,
100         rflag,
101         sflag,
102         tflag,
103         Uflag,
104         uflag,
105         vflag,
106         Xflag,
107 };
108
109 static struct getopt_parms parms[] = {
110         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
111         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
112         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
113         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
114         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
115         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
116         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
117         {'h', 0, 0, 0},
118         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
119         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
120         {'M', 0, 0, 0},
121         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
122         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
123         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
124         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {0, 0, 0, 0}
131 };
132
133 extern char *optarg;
134 extern int optind, opterr, optopt;
135 extern int errno;
136
137 extern struct NetFlow NetFlow1;
138 extern struct NetFlow NetFlow5;
139 extern struct NetFlow NetFlow7;
140
141 #define mark_is_tos parms[Mflag].count
142 static unsigned scan_interval = 5;
143 static int frag_lifetime = 30;
144 static int inactive_lifetime = 60;
145 static int active_lifetime = 300;
146 static int sockbufsize;
147 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
148 #if (MEM_BITS == 0) || (MEM_BITS == 16)
149 #define BULK_QUANTITY 10000
150 #else
151 #define BULK_QUANTITY 200
152 #endif
153 static unsigned bulk_quantity = BULK_QUANTITY;
154 static unsigned pending_queue_length = 100;
155 static struct NetFlow *netflow = &NetFlow5;
156 static unsigned verbosity = 6;
157 static unsigned log_dest = MY_LOG_SYSLOG;
158 static struct Time start_time;
159 static long start_time_offset;
160 static int off_tl;
161 /* From mem.c */
162 extern unsigned total_elements;
163 extern unsigned free_elements;
164 extern unsigned total_memory;
165 #if ((DEBUG) & DEBUG_I)
166 static unsigned emit_pkts, emit_queue;
167 static uint64_t size_total;
168 static unsigned pkts_total, pkts_total_fragmented;
169 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
170 static unsigned pkts_pending, pkts_pending_done;
171 static unsigned pending_queue_trace, pending_queue_trace_candidate;
172 static unsigned flows_total, flows_fragmented;
173 #endif
174 static unsigned emit_count;
175 static uint32_t emit_sequence;
176 static unsigned emit_rate_bytes, emit_rate_delay;
177 static struct Time emit_time;
178 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
179 static pthread_t thid;
180 static sigset_t sig_mask;
181 static struct sched_param schedp;
182 static int sched_min, sched_max;
183 static int npeers, npeers_rot;
184 static struct peer *peers;
185 static int sigs;
186
187 static struct Flow *flows[1 << HASH_BITS];
188 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
189
190 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
192
193 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
194 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
195 static struct Flow *pending_head, *pending_tail;
196 static struct Flow *scan_frag_dreg;
197
198 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
199 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
200 static struct Flow *flows_emit;
201
202 static char ident[256] = "fprobe-ulog";
203 static FILE *pidfile;
204 static char *pidfilepath;
205 static pid_t pid;
206 static int killed;
207 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
208 static struct ipulog_handle *ulog_handle;
209 static uint32_t ulog_gmask = 1;
210 static char *cap_buf;
211 static int nsnmp_rules;
212 static struct snmp_rule *snmp_rules;
213 static struct passwd *pw = 0;
214
215 void usage()
216 {
217         fprintf(stdout,
218                 "fprobe-ulog: a NetFlow probe. Version %s\n"
219                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
220                 "\n"
221                 "-h\t\tDisplay this help\n"
222                 "-U <mask>\tULOG group bitwise mask [1]\n"
223                 "-s <seconds>\tHow often scan for expired flows [5]\n"
224                 "-g <seconds>\tFragmented flow lifetime [30]\n"
225                 "-d <seconds>\tIdle flow lifetime (inactive timer) [60]\n"
226                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
227                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
228                 "-a <address>\tUse <address> as source for NetFlow flow\n"
229                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
230                 "-M\t\tUse netfilter mark value as ToS flag\n"
231                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
232                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
233                 "-q <flows>\tPending queue length [100]\n"
234                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
235                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
236                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
237                 "-c <directory>\tDirectory to chroot to\n"
238                 "-u <user>\tUser to run as\n"
239                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
240                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
241                 "remote:port\tAddress of the NetFlow collector\n",
242                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
243         exit(0);
244 }
245
246 #if ((DEBUG) & DEBUG_I)
247 void info_debug()
248 {
249         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
250                 pkts_total, pkts_total_fragmented, size_total,
251                 pkts_pending - pkts_pending_done, pending_queue_trace);
252         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
253                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
254         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
255                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
256         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
257                 total_elements, free_elements, total_memory);
258 }
259 #endif
260
261 void sighandler(int sig)
262 {
263         switch (sig) {
264                 case SIGTERM:
265                         sigs |= SIGTERM_MASK;
266                         break;
267 #if ((DEBUG) & DEBUG_I)
268                 case SIGUSR1:
269                         sigs |= SIGUSR1_MASK;
270                         break;
271 #endif
272         }
273 }
274
275 void gettime(struct Time *now)
276 {
277         struct timeval t;
278
279         gettimeofday(&t, 0);
280         now->sec = t.tv_sec;
281         now->usec = t.tv_usec;
282 }
283
284 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
285 {
286         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
287 }
288
289 /* Uptime in miliseconds */
290 uint32_t getuptime(struct Time *t)
291 {
292         /* Maximum uptime is about 49/2 days */
293         return cmpmtime(t, &start_time);
294 }
295
296 hash_t hash_flow(struct Flow *flow)
297 {
298         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
299         else return hash(flow, sizeof(struct Flow_TL));
300 }
301
302 uint16_t snmp_index(char *name) {
303         uint32_t i;
304
305         if (!*name) return 0;
306
307         for (i = 0; (int) i < nsnmp_rules; i++) {
308                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
309                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
310         }
311
312         if ((i = if_nametoindex(name))) return i;
313
314         return -1;
315 }
316
317 inline void copy_flow(struct Flow *src, struct Flow *dst)
318 {
319         dst->iif = src->iif;
320         dst->oif = src->oif;
321         dst->sip = src->sip;
322         dst->dip = src->dip;
323         dst->tos = src->tos;
324         dst->proto = src->proto;
325         dst->tcp_flags = src->tcp_flags;
326         dst->id = src->id;
327         dst->sp = src->sp;
328         dst->dp = src->dp;
329         dst->pkts = src->pkts;
330         dst->size = src->size;
331         dst->sizeF = src->sizeF;
332         dst->sizeP = src->sizeP;
333         dst->ctime = src->ctime;
334         dst->mtime = src->mtime;
335         dst->flags = src->flags;
336 }
337
338 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
339 {
340         struct Flow **flowpp;
341
342 #ifdef WALL
343         flowpp = 0;
344 #endif
345
346         if (prev) flowpp = *prev;
347
348         while (where) {
349                 if (where->sip.s_addr == what->sip.s_addr
350                         && where->dip.s_addr == what->dip.s_addr
351                         && where->proto == what->proto) {
352                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
353                                 case 0:
354                                         /* Both unfragmented */
355                                         if ((what->sp == where->sp)
356                                                 && (what->dp == where->dp)) goto done;
357                                         break;
358                                 case 2:
359                                         /* Both fragmented */
360                                         if (where->id == what->id) goto done;
361                                         break;
362                         }
363                 }
364                 flowpp = &where->next;
365                 where = where->next;
366         }
367 done:
368         if (prev) *prev = flowpp;
369         return where;
370 }
371
372 int put_into(struct Flow *flow, int flag
373 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
374         , char *logbuf
375 #endif
376 )
377 {
378         int ret = 0;
379         hash_t h;
380         struct Flow *flown, **flowpp;
381 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
382         char buf[64];
383 #endif
384
385         h = hash_flow(flow);
386 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
387         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
388         strcat(logbuf, buf);
389 #endif
390         pthread_mutex_lock(&flows_mutex[h]);
391         flowpp = &flows[h];
392         if (!(flown = find(flows[h], flow, &flowpp))) {
393                 /* No suitable flow found - add */
394                 if (flag == COPY_INTO) {
395                         if ((flown = mem_alloc())) {
396                                 copy_flow(flow, flown);
397                                 flow = flown;
398                         } else {
399 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
400                                 my_log(LOG_ERR, "%s %s. %s",
401                                         "mem_alloc():", strerror(errno), "packet lost");
402 #endif
403                                 return -1;
404                         }
405                 }
406                 flow->next = flows[h];
407                 flows[h] = flow;
408 #if ((DEBUG) & DEBUG_I)
409                 flows_total++;
410                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
411 #endif
412 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
413                 if (flown) {
414                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
415                         strcat(logbuf, buf);
416                 }
417 #endif
418         } else {
419                 /* Found suitable flow - update */
420 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
421                 sprintf(buf, " +> %x", (unsigned) flown);
422                 strcat(logbuf, buf);
423 #endif
424                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
425                         flown->mtime = flow->mtime;
426                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
427                         flown->ctime = flow->ctime;
428                 flown->tcp_flags |= flow->tcp_flags;
429                 flown->size += flow->size;
430                 flown->pkts += flow->pkts;
431                 if (flow->flags & FLOW_FRAG) {
432                         /* Fragmented flow require some additional work */
433                         if (flow->flags & FLOW_TL) {
434                                 /*
435                                 ?FIXME?
436                                 Several packets with FLOW_TL (attack)
437                                 */
438                                 flown->sp = flow->sp;
439                                 flown->dp = flow->dp;
440                         }
441                         if (flow->flags & FLOW_LASTFRAG) {
442                                 /*
443                                 ?FIXME?
444                                 Several packets with FLOW_LASTFRAG (attack)
445                                 */
446                                 flown->sizeP = flow->sizeP;
447                         }
448                         flown->flags |= flow->flags;
449                         flown->sizeF += flow->sizeF;
450                         if ((flown->flags & FLOW_LASTFRAG)
451                                 && (flown->sizeF >= flown->sizeP)) {
452                                 /* All fragments received - flow reassembled */
453                                 *flowpp = flown->next;
454                                 pthread_mutex_unlock(&flows_mutex[h]);
455 #if ((DEBUG) & DEBUG_I)
456                                 flows_total--;
457                                 flows_fragmented--;
458 #endif
459                                 flown->id = 0;
460                                 flown->flags &= ~FLOW_FRAG;
461 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
462                                 strcat(logbuf," R");
463 #endif
464                                 ret = put_into(flown, MOVE_INTO
465 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
466                                                 , logbuf
467 #endif
468                                         );
469                         }
470                 }
471                 if (flag == MOVE_INTO) mem_free(flow);
472         }
473         pthread_mutex_unlock(&flows_mutex[h]);
474         return ret;
475 }
476
477 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
478 {
479         int i;
480
481         for (i = 0; i < fields; i++) {
482 #if ((DEBUG) & DEBUG_F)
483                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
484 #endif
485                 switch (format[i]) {
486                         case NETFLOW_IPV4_SRC_ADDR:
487                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
488                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
489                                 break;
490
491                         case NETFLOW_IPV4_DST_ADDR:
492                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
493                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
494                                 break;
495
496                         case NETFLOW_INPUT_SNMP:
497                                 *((uint16_t *) p) = htons(flow->iif);
498                                 p += NETFLOW_INPUT_SNMP_SIZE;
499                                 break;
500
501                         case NETFLOW_OUTPUT_SNMP:
502                                 *((uint16_t *) p) = htons(flow->oif);
503                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
504                                 break;
505
506                         case NETFLOW_PKTS_32:
507                                 *((uint32_t *) p) = htonl(flow->pkts);
508                                 p += NETFLOW_PKTS_32_SIZE;
509                                 break;
510
511                         case NETFLOW_BYTES_32:
512                                 *((uint32_t *) p) = htonl(flow->size);
513                                 p += NETFLOW_BYTES_32_SIZE;
514                                 break;
515
516                         case NETFLOW_FIRST_SWITCHED:
517                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
518                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
519                                 break;
520
521                         case NETFLOW_LAST_SWITCHED:
522                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
523                                 p += NETFLOW_LAST_SWITCHED_SIZE;
524                                 break;
525
526                         case NETFLOW_L4_SRC_PORT:
527                                 *((uint16_t *) p) = flow->sp;
528                                 p += NETFLOW_L4_SRC_PORT_SIZE;
529                                 break;
530
531                         case NETFLOW_L4_DST_PORT:
532                                 *((uint16_t *) p) = flow->dp;
533                                 p += NETFLOW_L4_DST_PORT_SIZE;
534                                 break;
535
536                         case NETFLOW_PROT:
537                                 *((uint8_t *) p) = flow->proto;
538                                 p += NETFLOW_PROT_SIZE;
539                                 break;
540
541                         case NETFLOW_SRC_TOS:
542                                 *((uint8_t *) p) = flow->tos;
543                                 p += NETFLOW_SRC_TOS_SIZE;
544                                 break;
545
546                         case NETFLOW_TCP_FLAGS:
547                                 *((uint8_t *) p) = flow->tcp_flags;
548                                 p += NETFLOW_TCP_FLAGS_SIZE;
549                                 break;
550
551                         case NETFLOW_VERSION:
552                                 *((uint16_t *) p) = htons(netflow->Version);
553                                 p += NETFLOW_VERSION_SIZE;
554                                 break;
555
556                         case NETFLOW_COUNT:
557                                 *((uint16_t *) p) = htons(emit_count);
558                                 p += NETFLOW_COUNT_SIZE;
559                                 break;
560
561                         case NETFLOW_UPTIME:
562                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
563                                 p += NETFLOW_UPTIME_SIZE;
564                                 break;
565
566                         case NETFLOW_UNIX_SECS:
567                                 *((uint32_t *) p) = htonl(emit_time.sec);
568                                 p += NETFLOW_UNIX_SECS_SIZE;
569                                 break;
570
571                         case NETFLOW_UNIX_NSECS:
572                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
573                                 p += NETFLOW_UNIX_NSECS_SIZE;
574                                 break;
575
576                         case NETFLOW_FLOW_SEQUENCE:
577                                 //*((uint32_t *) p) = htonl(emit_sequence);
578                                 *((uint32_t *) p) = 0;
579                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
580                                 break;
581
582                         case NETFLOW_PAD8:
583                         /* Unsupported (uint8_t) */
584                         case NETFLOW_ENGINE_TYPE:
585                         case NETFLOW_ENGINE_ID:
586                         case NETFLOW_FLAGS7_1:
587                         case NETFLOW_SRC_MASK:
588                         case NETFLOW_DST_MASK:
589                                 *((uint8_t *) p) = 0;
590                                 p += NETFLOW_PAD8_SIZE;
591                                 break;
592
593                         case NETFLOW_PAD16:
594                         /* Unsupported (uint16_t) */
595                         case NETFLOW_SRC_AS:
596                         case NETFLOW_DST_AS:
597                         case NETFLOW_FLAGS7_2:
598                                 *((uint16_t *) p) = 0;
599                                 p += NETFLOW_PAD16_SIZE;
600                                 break;
601
602                         case NETFLOW_PAD32:
603                         /* Unsupported (uint32_t) */
604                         case NETFLOW_IPV4_NEXT_HOP:
605                         case NETFLOW_ROUTER_SC:
606                                 *((uint32_t *) p) = 0;
607                                 p += NETFLOW_PAD32_SIZE;
608                                 break;
609
610                         default:
611                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
612                                         format, i, format[i]);
613                                 exit(1);
614                 }
615         }
616 #if ((DEBUG) & DEBUG_F)
617         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
618 #endif
619         return p;
620 }
621
622 void setuser() {
623         /*
624         Workaround for clone()-based threads
625         Try to change EUID independently of main thread
626         */
627         if (pw) {
628                 setgroups(0, NULL);
629                 setregid(pw->pw_gid, pw->pw_gid);
630                 setreuid(pw->pw_uid, pw->pw_uid);
631         }
632 }
633
634 void *emit_thread()
635 {
636         struct Flow *flow;
637         void *p;
638         struct timeval now;
639         struct timespec timeout;
640         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
641
642         p = (void *) &emit_packet + netflow->HeaderSize;
643         timeout.tv_nsec = 0;
644
645         setuser();
646
647         for (;;) {
648                 pthread_mutex_lock(&emit_mutex);
649                 while (!flows_emit) {
650                         gettimeofday(&now, 0);
651                         timeout.tv_sec = now.tv_sec + emit_timeout;
652                         /* Do not wait until emit_packet will filled - it may be too long */
653                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
654                                 pthread_mutex_unlock(&emit_mutex);
655                                 goto sendit;
656                         }
657                 }
658                 flow = flows_emit;
659                 flows_emit = flows_emit->next;
660 #if ((DEBUG) & DEBUG_I)
661                 emit_queue--;
662 #endif          
663                 pthread_mutex_unlock(&emit_mutex);
664
665 #ifdef UPTIME_TRICK
666                 if (!emit_count) {
667                         gettime(&start_time);
668                         start_time.sec -= start_time_offset;
669                 }
670 #endif
671                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
672                 mem_free(flow);
673                 emit_count++;
674                 if (emit_count == netflow->MaxFlows) {
675                 sendit:
676                         gettime(&emit_time);
677                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
678                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
679                         peer_rot_cur = 0;
680                         for (i = 0; i < npeers; i++) {
681                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
682                                 if (peers[i].type == PEER_ROTATE) 
683                                         if (peer_rot_cur++ == peer_rot_work) {
684                                         sendreal:
685                                                 if (netflow->SeqOffset)
686                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
687                                                 ret = send(peers[i].sock, emit_packet, size, 0);
688                                                 if (ret < size) {
689 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
690                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
691                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
692 #endif
693                                                 }
694 #if ((DEBUG) & DEBUG_E)
695                                                 commaneelse {
696                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
697                                                                 emit_count, i + 1, peers[i].seq);
698                                                 }
699 #endif
700                                                 peers[i].seq += emit_count;
701
702                                                 /* Rate limit */
703                                                 if (emit_rate_bytes) {
704                                                         sent += size;
705                                                         delay = sent / emit_rate_bytes;
706                                                         if (delay) {
707                                                                 sent %= emit_rate_bytes;
708                                                                 timeout.tv_sec = 0;
709                                                                 timeout.tv_nsec = emit_rate_delay * delay;
710                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
711                                                         }
712                                                 }
713                                         }
714                         }
715                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
716                         emit_sequence += emit_count;
717                         emit_count = 0;
718 #if ((DEBUG) & DEBUG_I)
719                         emit_pkts++;
720 #endif
721                 }
722         }
723 }       
724
725 void *unpending_thread()
726 {
727         struct timeval now;
728         struct timespec timeout;
729 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
730         char logbuf[256];
731 #endif
732
733         setuser();
734
735         timeout.tv_nsec = 0;
736         pthread_mutex_lock(&unpending_mutex);
737
738         for (;;) {
739                 while (!(pending_tail->flags & FLOW_PENDING)) {
740                         gettimeofday(&now, 0);
741                         timeout.tv_sec = now.tv_sec + unpending_timeout;
742                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
743                 }
744
745 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
746                 *logbuf = 0;
747 #endif
748                 if (put_into(pending_tail, COPY_INTO
749 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
750                                 , logbuf
751 #endif
752                         ) < 0) {
753 #if ((DEBUG) & DEBUG_I)
754                         pkts_lost_unpending++;
755 #endif                          
756                 }
757
758 #if ((DEBUG) & DEBUG_U)
759                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
760 #endif
761
762                 pending_tail->flags = 0;
763                 pending_tail = pending_tail->next;
764 #if ((DEBUG) & DEBUG_I)
765                 pkts_pending_done++;
766 #endif
767         }
768 }
769
770 void *scan_thread()
771 {
772 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
773         char logbuf[256];
774 #endif
775         int i;
776         struct Flow *flow, **flowpp;
777         struct Time now;
778         struct timespec timeout;
779
780         setuser();
781
782         timeout.tv_nsec = 0;
783         pthread_mutex_lock(&scan_mutex);
784
785         for (;;) {
786                 gettime(&now);
787                 timeout.tv_sec = now.sec + scan_interval;
788                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
789
790                 gettime(&now);
791 #if ((DEBUG) & DEBUG_S)
792                 my_log(LOG_DEBUG, "S: %d", now.sec);
793 #endif
794                 for (i = 0; i < 1 << HASH_BITS ; i++) {
795                         pthread_mutex_lock(&flows_mutex[i]);
796                         flow = flows[i];
797                         flowpp = &flows[i];
798                         while (flow) {
799                                 if (flow->flags & FLOW_FRAG) {
800                                         /* Process fragmented flow */
801                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
802                                                 /* Fragmented flow expired - put it into special chain */
803 #if ((DEBUG) & DEBUG_I)
804                                                 flows_fragmented--;
805                                                 flows_total--;
806 #endif
807                                                 *flowpp = flow->next;
808                                                 flow->id = 0;
809                                                 flow->flags &= ~FLOW_FRAG;
810                                                 flow->next = scan_frag_dreg;
811                                                 scan_frag_dreg = flow;
812                                                 flow = *flowpp;
813                                                 continue;
814                                         }
815                                 } else {
816                                         /* Flow is not frgamented */
817                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
818                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
819                                                 /* Flow expired */
820 #if ((DEBUG) & DEBUG_S)
821                                                 my_log(LOG_DEBUG, "S: E %x", flow);
822 #endif
823 #if ((DEBUG) & DEBUG_I)
824                                                 flows_total--;
825 #endif
826                                                 *flowpp = flow->next;
827                                                 pthread_mutex_lock(&emit_mutex);
828                                                 flow->next = flows_emit;
829                                                 flows_emit = flow;
830 #if ((DEBUG) & DEBUG_I)
831                                                 emit_queue++;
832 #endif                          
833                                                 pthread_mutex_unlock(&emit_mutex);
834                                                 flow = *flowpp;
835                                                 continue;
836                                         }
837                                 }
838                                 flowpp = &flow->next;
839                                 flow = flow->next;
840                         } /* chain loop */
841                         pthread_mutex_unlock(&flows_mutex[i]);
842                 } /* hash loop */
843                 if (flows_emit) pthread_cond_signal(&emit_cond);
844
845                 while (scan_frag_dreg) {
846                         flow = scan_frag_dreg;
847                         scan_frag_dreg = flow->next;
848 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
849                         *logbuf = 0;
850 #endif
851                         put_into(flow, MOVE_INTO
852 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
853                                 , logbuf
854 #endif
855                         );
856 #if ((DEBUG) & DEBUG_S)
857                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
858 #endif
859                 }
860         }
861 }
862
863 void *cap_thread()
864 {
865         struct ulog_packet_msg *ulog_msg;
866         struct ip *nl;
867         void *tl;
868         struct Flow *flow;
869         int len, off_frag, psize;
870 #if ((DEBUG) & DEBUG_C)
871         char buf[64];
872         char logbuf[256];
873 #endif
874
875         setuser();
876
877         while (!killed) {
878                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
879                 if (len <= 0) {
880                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
881                         continue;
882                 }
883                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
884
885 #if ((DEBUG) & DEBUG_C)
886                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
887 #endif
888
889                         nl = (void *) &ulog_msg->payload;
890                         psize = ulog_msg->data_len;
891
892                         /* Sanity check */
893                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
894 #if ((DEBUG) & DEBUG_C)
895                                 strcat(logbuf, " U");
896                                 my_log(LOG_DEBUG, "%s", logbuf);
897 #endif
898 #if ((DEBUG) & DEBUG_I)
899                                 pkts_ignored++;
900 #endif
901                                 continue;
902                         }
903
904                         if (pending_head->flags) {
905 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
906                                 my_log(LOG_ERR,
907 # if ((DEBUG) & DEBUG_C)
908                                         "%s %s %s", logbuf,
909 # else
910                                         "%s %s",
911 # endif
912                                         "pending queue full:", "packet lost");
913 #endif
914 #if ((DEBUG) & DEBUG_I)
915                                 pkts_lost_capture++;
916 #endif
917                                 goto done;
918                         }
919
920 #if ((DEBUG) & DEBUG_I)
921                         pkts_total++;
922 #endif
923
924                         flow = pending_head;
925
926                         /* ?FIXME? Add sanity check for ip_len? */
927                         flow->size = ntohs(nl->ip_len);
928 #if ((DEBUG) & DEBUG_I)
929                         size_total += flow->size;
930 #endif
931
932                         flow->sip = nl->ip_src;
933                         flow->dip = nl->ip_dst;
934                         flow->iif = snmp_index(ulog_msg->indev_name);
935                         flow->oif = snmp_index(ulog_msg->outdev_name);
936                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
937                         flow->proto = nl->ip_p;
938                         flow->id = 0;
939                         flow->tcp_flags = 0;
940                         flow->pkts = 1;
941                         flow->sizeF = 0;
942                         flow->sizeP = 0;
943                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
944                         if (ulog_msg->timestamp_sec) {
945                                 flow->ctime.sec = ulog_msg->timestamp_sec;
946                                 flow->ctime.usec = ulog_msg->timestamp_usec;
947                         } else gettime(&flow->ctime);
948                         flow->mtime = flow->ctime;
949
950                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
951
952                         /*
953                         Offset (from network layer) to transport layer header/IP data
954                         IOW IP header size ;-)
955
956                         ?FIXME?
957                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
958                         */
959                         off_tl = nl->ip_hl << 2;
960                         tl = (void *) nl + off_tl;
961
962                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
963                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
964                         psize -= off_tl;
965                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
966                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
967
968                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
969                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
970 #if ((DEBUG) & DEBUG_C)
971                                 strcat(logbuf, " F");
972 #endif
973 #if ((DEBUG) & DEBUG_I)
974                                 pkts_total_fragmented++;
975 #endif
976                                 flow->flags |= FLOW_FRAG;
977                                 flow->id = nl->ip_id;
978
979                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
980                                         /* Packet whith IP_MF contains information about whole datagram size */
981                                         flow->flags |= FLOW_LASTFRAG;
982                                         /* size = frag_offset*8 + data_size */
983                                         flow->sizeP = off_frag + flow->sizeF;
984                                 }
985                         }
986
987 #if ((DEBUG) & DEBUG_C)
988                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
989                         strcat(logbuf, buf);
990                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
991                         strcat(logbuf, buf);
992 #endif
993
994                         /*
995                         Fortunately most interesting transport layer information fit
996                         into first 8 bytes of IP data field (minimal nonzero size).
997                         Thus we don't need actual packet reassembling to build whole
998                         transport layer data. We only check the fragment offset for
999                         zero value to find packet with this information.
1000                         */
1001                         if (!off_frag && psize >= 8) {
1002                                 switch (flow->proto) {
1003                                         case IPPROTO_TCP:
1004                                         case IPPROTO_UDP:
1005                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1006                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1007                                                 goto tl_known;
1008
1009 #ifdef ICMP_TRICK
1010                                         case IPPROTO_ICMP:
1011                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1012                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1013                                                 goto tl_known;
1014 #endif
1015 #ifdef ICMP_TRICK_CISCO
1016                                         case IPPROTO_ICMP:
1017                                                 flow->dp = *((int32_t *) tl);
1018                                                 goto tl_known;
1019 #endif
1020
1021                                         default:
1022                                                 /* Unknown transport layer */
1023 #if ((DEBUG) & DEBUG_C)
1024                                                 strcat(logbuf, " U");
1025 #endif
1026                                                 flow->sp = 0;
1027                                                 flow->dp = 0;
1028                                                 break;
1029
1030                                         tl_known:
1031 #if ((DEBUG) & DEBUG_C)
1032                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1033                                                 strcat(logbuf, buf);
1034 #endif
1035                                                 flow->flags |= FLOW_TL;
1036                                 }
1037                         }
1038
1039                         /* Check for tcp flags presence (including CWR and ECE). */
1040                         if (flow->proto == IPPROTO_TCP
1041                                 && off_frag < 16
1042                                 && psize >= 16 - off_frag) {
1043                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1044 #if ((DEBUG) & DEBUG_C)
1045                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1046                                 strcat(logbuf, buf);
1047 #endif
1048                         }
1049
1050 #if ((DEBUG) & DEBUG_C)
1051                         sprintf(buf, " => %x", (unsigned) flow);
1052                         strcat(logbuf, buf);
1053                         my_log(LOG_DEBUG, "%s", logbuf);
1054 #endif
1055
1056 #if ((DEBUG) & DEBUG_I)
1057                         pkts_pending++;
1058                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1059                         if (pending_queue_trace < pending_queue_trace_candidate)
1060                                 pending_queue_trace = pending_queue_trace_candidate;
1061 #endif
1062
1063                         /* Flow complete - inform unpending_thread() about it */
1064                         pending_head->flags |= FLOW_PENDING;
1065                         pending_head = pending_head->next;
1066                 done:
1067                         pthread_cond_signal(&unpending_cond);
1068                 }
1069         }
1070         return 0;
1071 }
1072
1073 int main(int argc, char **argv)
1074 {
1075         char errpbuf[512];
1076         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1077         int c, i, sock, memory_limit = 0;
1078         struct addrinfo hints, *res;
1079         struct sockaddr_in saddr;
1080         pthread_attr_t tattr;
1081         struct sigaction sigact;
1082         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1083         struct timeval timeout;
1084
1085         sched_min = sched_get_priority_min(SCHED);
1086         sched_max = sched_get_priority_max(SCHED);
1087
1088         memset(&saddr, 0 , sizeof(saddr));
1089         memset(&hints, 0 , sizeof(hints));
1090         hints.ai_flags = AI_PASSIVE;
1091         hints.ai_family = AF_INET;
1092         hints.ai_socktype = SOCK_DGRAM;
1093
1094         /* Process command line options */
1095
1096         opterr = 0;
1097         while ((c = my_getopt(argc, argv, parms)) != -1) {
1098                 switch (c) {
1099                         case '?':
1100                                 usage();
1101
1102                         case 'h':
1103                                 usage();
1104                 }
1105         }
1106
1107         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1108         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1109         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1110         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1111         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1112         if (parms[nflag].count) {
1113                 switch (atoi(parms[nflag].arg)) {
1114                         case 1:
1115                                 netflow = &NetFlow1;
1116                                 break;
1117
1118                         case 5:
1119                                 break;
1120
1121                         case 7:
1122                                 netflow = &NetFlow7;
1123                                 break;
1124
1125                         default:
1126                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1127                                 exit(1);
1128                 }
1129         }
1130         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1131         if (parms[lflag].count) {
1132                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1133                         *log_suffix++ = 0;
1134                         if (*log_suffix) {
1135                                 sprintf(errpbuf, "[%s]", log_suffix);
1136                                 strcat(ident, errpbuf);
1137                         }
1138                 }
1139                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1140                 if (log_suffix) *--log_suffix = ':';
1141         }
1142         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1143         err_malloc:
1144                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1145                 exit(1);
1146         }
1147         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1148         if (parms[qflag].count) {
1149                 pending_queue_length = atoi(parms[qflag].arg);
1150                 if (pending_queue_length < 1) {
1151                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1152                         exit(1);
1153                 }
1154         }
1155         if (parms[rflag].count) {
1156                 schedp.sched_priority = atoi(parms[rflag].arg);
1157                 if (schedp.sched_priority
1158                         && (schedp.sched_priority < sched_min
1159                                 || schedp.sched_priority > sched_max)) {
1160                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1161                         exit(1);
1162                 }
1163         }
1164         if (parms[Bflag].count) {
1165                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1166         }
1167         if (parms[bflag].count) {
1168                 bulk_quantity = atoi(parms[bflag].arg);
1169                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1170                         fprintf(stderr, "Illegal %s\n", "bulk size");
1171                         exit(1);
1172                 }
1173         }
1174         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1175         if (parms[Xflag].count) {
1176                 for(i = 0; parms[Xflag].arg[i]; i++)
1177                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1178                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1179                         goto err_malloc;
1180                 rule = strtok(parms[Xflag].arg, ":");
1181                 for (i = 0; rule; i++) {
1182                         snmp_rules[i].len = strlen(rule);
1183                         if (snmp_rules[i].len > IFNAMSIZ) {
1184                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1185                                 exit(1);
1186                         }
1187                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1188                         if (!*(rule - 1)) *(rule - 1) = ',';
1189                         rule = strtok(NULL, ",");
1190                         if (!rule) {
1191                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1192                                 exit(1);
1193                         }
1194                         snmp_rules[i].base = atoi(rule);
1195                         *(rule - 1) = ':';
1196                         rule = strtok(NULL, ":");
1197                 }
1198                 nsnmp_rules = i;
1199         }
1200         if (parms[tflag].count)
1201                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1202         if (parms[aflag].count) {
1203                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1204                 bad_lhost:
1205                         fprintf(stderr, "Illegal %s\n", "source address");
1206                         exit(1);
1207                 } else {
1208                         saddr = *((struct sockaddr_in *) res->ai_addr);
1209                         freeaddrinfo(res);
1210                 }
1211         }
1212         if (parms[uflag].count) 
1213                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1214                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1215                         exit(1);
1216                 }
1217
1218
1219         /* Process collectors parameters. Brrrr... :-[ */
1220
1221         npeers = argc - optind;
1222         if (npeers < 1) usage();
1223         if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1224         for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1225                 dhost = argv[i];
1226                 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1227                 *dport++ = 0;
1228                 if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1229                         fprintf(stderr, "socket(): %s\n", strerror(errno));
1230                         exit(1);
1231                 }
1232                 peers[npeers].sock = sock;
1233                 peers[npeers].type = PEER_MIRROR;
1234                 peers[npeers].laddr = saddr;
1235                 peers[npeers].seq = 0;
1236                 if ((lhost = strchr(dport, '/'))) {
1237                         *lhost++ = 0;
1238                         if ((type = strchr(lhost, '/'))) {
1239                                 *type++ = 0;
1240                                 switch (*type) {
1241                                         case 0:
1242                                         case 'm':
1243                                                 break;
1244
1245                                         case 'r':
1246                                                 peers[npeers].type = PEER_ROTATE;
1247                                                 npeers_rot++;
1248                                                 break;
1249
1250                                         default:
1251                                                 goto bad_collector;
1252                                 }
1253                         }
1254                         if (*lhost) {
1255                                 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1256                                 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1257                                 freeaddrinfo(res);
1258                         }
1259                 }
1260                 if (bind(sock, (struct sockaddr *) &peers[npeers].laddr,
1261                                 sizeof(struct sockaddr_in))) {
1262                         fprintf(stderr, "bind(): %s\n", strerror(errno));
1263                         exit(1);
1264                 }
1265                 if (getaddrinfo(dhost, dport, &hints, &res)) {
1266                 bad_collector:
1267                         fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1268                         exit(1);
1269                 }
1270                 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1271                 freeaddrinfo(res);
1272                 if (connect(sock, (struct sockaddr *) &peers[npeers].addr,
1273                                 sizeof(struct sockaddr_in))) {
1274                         fprintf(stderr, "connect(): %s\n", strerror(errno));
1275                         exit(1);
1276                 }
1277
1278                 /* Restore command line */
1279                 if (type) *--type = '/';
1280                 if (lhost) *--lhost = '/';
1281                 *--dport = ':';
1282         }
1283
1284         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1285         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1286         if (!ulog_handle) {
1287                 fprintf(stderr, "libipulog initialization error: %s",
1288                         ipulog_strerror(ipulog_errno));
1289                 exit(1);
1290         }
1291         if (sockbufsize)
1292                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1293                         &sockbufsize, sizeof(sockbufsize)) < 0)
1294                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1295
1296         /* Daemonize (if log destination stdout-free) */
1297
1298         my_log_open(ident, verbosity, log_dest);
1299         if (!(log_dest & 2)) {
1300                 switch (fork()) {
1301                         case -1:
1302                                 fprintf(stderr, "fork(): %s", strerror(errno));
1303                                 exit(1);
1304
1305                         case 0:
1306                                 setsid();
1307                                 freopen("/dev/null", "r", stdin);
1308                                 freopen("/dev/null", "w", stdout);
1309                                 freopen("/dev/null", "w", stderr);
1310                                 break;
1311
1312                         default:
1313                                 exit(0);
1314                 }
1315         } else {
1316                 setvbuf(stdout, (char *)0, _IONBF, 0);
1317                 setvbuf(stderr, (char *)0, _IONBF, 0);
1318         }
1319
1320         pid = getpid();
1321         sprintf(errpbuf, "[%ld]", (long) pid);
1322         strcat(ident, errpbuf);
1323
1324         /* Initialization */
1325
1326         hash_init(); /* Actually for crc16 only */
1327         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1328         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1329
1330 #ifdef UPTIME_TRICK
1331         /* Hope 12 days is enough :-/ */
1332         start_time_offset = 1 << 20;
1333
1334         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1335 #endif
1336         gettime(&start_time);
1337
1338         /*
1339         Build static pending queue as circular buffer.
1340         */
1341         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1342         pending_tail = pending_head;
1343         for (i = pending_queue_length - 1; i--;) {
1344                 if (!(pending_tail->next = mem_alloc())) {
1345                 err_mem_alloc:
1346                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1347                         exit(1);
1348                 }
1349                 pending_tail = pending_tail->next;
1350         }
1351         pending_tail->next = pending_head;
1352         pending_tail = pending_head;
1353
1354         sigemptyset(&sig_mask);
1355         sigact.sa_handler = &sighandler;
1356         sigact.sa_mask = sig_mask;
1357         sigact.sa_flags = 0;
1358         sigaddset(&sig_mask, SIGTERM);
1359         sigaction(SIGTERM, &sigact, 0);
1360 #if ((DEBUG) & DEBUG_I)
1361         sigaddset(&sig_mask, SIGUSR1);
1362         sigaction(SIGUSR1, &sigact, 0);
1363 #endif
1364         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1365                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1366                 exit(1);
1367         }
1368
1369         my_log(LOG_INFO, "Starting %s...", VERSION);
1370
1371         if (parms[cflag].count) {
1372                 if (chdir(parms[cflag].arg) || chroot(".")) {
1373                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1374                         exit(1);
1375                 }
1376         }
1377
1378         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1379         pthread_attr_init(&tattr);
1380         for (i = 0; i < THREADS - 1; i++) {
1381                 if (schedp.sched_priority > 0) {
1382                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1383                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1384                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1385                                 exit(1);
1386                         }
1387                 }
1388                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1389                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1390                         exit(1);
1391                 }
1392                 pthread_detach(thid);
1393                 schedp.sched_priority++;
1394         }
1395
1396         if (pw) {
1397                 if (setgroups(0, NULL)) {
1398                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1399                         exit(1);
1400                 }
1401                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1402                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1403                         exit(1);
1404                 }
1405                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1406                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1407                         exit(1);
1408                 }
1409         }
1410
1411         if (!(pidfile = fopen(pidfilepath, "w")))
1412                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1413         else {
1414                 fprintf(pidfile, "%ld\n", (long) pid);
1415                 fclose(pidfile);
1416         }
1417
1418         my_log(LOG_INFO, "pid: %d", pid);
1419         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1420                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1421                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1422                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1423                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1424                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1425                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1426         for (i = 0; i < nsnmp_rules; i++) {
1427                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1428                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1429         }
1430         for (i = 0; i < npeers; i++) {
1431                 switch (peers[i].type) {
1432                         case PEER_MIRROR:
1433                                 c = 'm';
1434                                 break;
1435                         case PEER_ROTATE:
1436                                 c = 'r';
1437                                 break;
1438                 }
1439                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1440                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1441                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1442         }
1443
1444         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1445
1446         timeout.tv_usec = 0;
1447         while (!killed
1448                 || (total_elements - free_elements - pending_queue_length)
1449                 || emit_count
1450                 || pending_tail->flags) {
1451
1452                 if (!sigs) {
1453                         timeout.tv_sec = scan_interval;
1454                         select(0, 0, 0, 0, &timeout);
1455                 }
1456
1457                 if (sigs & SIGTERM_MASK && !killed) {
1458                         sigs &= ~SIGTERM_MASK;
1459                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1460                         scan_interval = 1;
1461                         frag_lifetime = -1;
1462                         active_lifetime = -1;
1463                         inactive_lifetime = -1;
1464                         emit_timeout = 1;
1465                         unpending_timeout = 1;
1466                         killed = 1;
1467                         pthread_cond_signal(&scan_cond);
1468                         pthread_cond_signal(&unpending_cond);
1469                 }
1470
1471 #if ((DEBUG) & DEBUG_I)
1472                 if (sigs & SIGUSR1_MASK) {
1473                         sigs &= ~SIGUSR1_MASK;
1474                         info_debug();
1475                 }
1476 #endif
1477         }
1478         remove(pidfilepath);
1479 #if ((DEBUG) & DEBUG_I)
1480         info_debug();
1481 #endif
1482         my_log(LOG_INFO, "Done.");
1483 #ifdef WALL
1484         return 0;
1485 #endif
1486 }