Re-import of fprobe-ulog
[fprobe-ulog.git] / trunk / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8 */
9
10 #include <common.h>
11
12 /* stdout, stderr, freopen() */
13 #include <stdio.h>
14
15 /* atoi(), exit() */
16 #include <stdlib.h>
17
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
19 #include <unistd.h>
20
21 /* strerror() */
22 #include <string.h>
23
24 /* sig*() */
25 #include <signal.h>
26
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
29         int fd;
30         u_int8_t blocking;
31         struct sockaddr_nl local;
32         struct sockaddr_nl peer;
33         struct nlmsghdr* last_nlhdr;
34 };
35
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
46 #include <net/if.h>
47
48 #include <sys/param.h>
49 #include <pwd.h>
50 #ifdef OS_LINUX
51 #include <grp.h>
52 #endif
53
54 /* pthread_*() */
55 #include <pthread.h>
56
57 /* errno */
58 #include <errno.h>
59
60 /* getaddrinfo() */
61 #include <netdb.h>
62
63 /* nanosleep() */
64 #include <time.h>
65
66 /* gettimeofday() */
67 #include <sys/time.h>
68
69 /* scheduling */
70 #include <sched.h>
71
72 /* select() (POSIX)*/
73 #include <sys/select.h>
74
75 /* open() */
76 #include <sys/stat.h>
77 #include <fcntl.h>
78
79 #include <fprobe-ulog.h>
80 #include <my_log.h>
81 #include <my_getopt.h>
82 #include <netflow.h>
83 #include <hash.h>
84 #include <mem.h>
85
86 enum {
87         aflag,
88         Bflag,
89         bflag,
90         cflag,
91         dflag,
92         eflag,
93         fflag,
94         gflag,
95         hflag,
96         lflag,
97         mflag,
98         Mflag,
99         nflag,
100         qflag,
101         rflag,
102         sflag,
103         tflag,
104         Uflag,
105         uflag,
106         vflag,
107         Xflag,
108 };
109
110 static struct getopt_parms parms[] = {
111         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
112         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
113         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
114         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
115         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
116         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
117         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
118         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
119         {'h', 0, 0, 0},
120         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
121         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
122         {'M', 0, 0, 0},
123         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
124         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {0, 0, 0, 0}
133 };
134
135 extern char *optarg;
136 extern int optind, opterr, optopt;
137 extern int errno;
138
139 extern struct NetFlow NetFlow1;
140 extern struct NetFlow NetFlow5;
141 extern struct NetFlow NetFlow7;
142
143 #define mark_is_tos parms[Mflag].count
144 static unsigned scan_interval = 5;
145 static int frag_lifetime = 30;
146 static int inactive_lifetime = 60;
147 static int active_lifetime = 300;
148 static int sockbufsize;
149 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
150 #if (MEM_BITS == 0) || (MEM_BITS == 16)
151 #define BULK_QUANTITY 10000
152 #else
153 #define BULK_QUANTITY 200
154 #endif
155 static unsigned bulk_quantity = BULK_QUANTITY;
156 static unsigned pending_queue_length = 100;
157 static struct NetFlow *netflow = &NetFlow5;
158 static unsigned verbosity = 6;
159 static unsigned log_dest = MY_LOG_SYSLOG;
160 static struct Time start_time;
161 static long start_time_offset;
162 static int off_tl;
163 /* From mem.c */
164 extern unsigned total_elements;
165 extern unsigned free_elements;
166 extern unsigned total_memory;
167 #if ((DEBUG) & DEBUG_I)
168 static unsigned emit_pkts, emit_queue;
169 static uint64_t size_total;
170 static unsigned pkts_total, pkts_total_fragmented;
171 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
172 static unsigned pkts_pending, pkts_pending_done;
173 static unsigned pending_queue_trace, pending_queue_trace_candidate;
174 static unsigned flows_total, flows_fragmented;
175 #endif
176 static unsigned emit_count;
177 static uint32_t emit_sequence;
178 static unsigned emit_rate_bytes, emit_rate_delay;
179 static struct Time emit_time;
180 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
181 static pthread_t thid;
182 static sigset_t sig_mask;
183 static struct sched_param schedp;
184 static int sched_min, sched_max;
185 static int npeers, npeers_rot;
186 static struct peer *peers;
187 static int sigs;
188
189 static struct Flow *flows[1 << HASH_BITS];
190 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
191
192 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
193 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
194
195 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
196 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
197 static struct Flow *pending_head, *pending_tail;
198 static struct Flow *scan_frag_dreg;
199
200 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
202 static struct Flow *flows_emit;
203
204 static char ident[256] = "fprobe-ulog";
205 static FILE *pidfile;
206 static char *pidfilepath;
207 static pid_t pid;
208 static int killed;
209 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
210 static struct ipulog_handle *ulog_handle;
211 static uint32_t ulog_gmask = 1;
212 static char *cap_buf;
213 static int nsnmp_rules;
214 static struct snmp_rule *snmp_rules;
215 static struct passwd *pw = 0;
216
217 void usage()
218 {
219         fprintf(stdout,
220                 "fprobe-ulog: a NetFlow probe. Version %s\n"
221                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
222                 "\n"
223                 "-h\t\tDisplay this help\n"
224                 "-U <mask>\tULOG group bitwise mask [1]\n"
225                 "-s <seconds>\tHow often scan for expired flows [5]\n"
226                 "-g <seconds>\tFragmented flow lifetime [30]\n"
227                 "-d <seconds>\tIdle flow lifetime (inactive timer) [60]\n"
228                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
229                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
230                 "-a <address>\tUse <address> as source for NetFlow flow\n"
231                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
232                 "-M\t\tUse netfilter mark value as ToS flag\n"
233                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
234                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
235                 "-q <flows>\tPending queue length [100]\n"
236                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
237                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
238                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
239                 "-c <directory>\tDirectory to chroot to\n"
240                 "-u <user>\tUser to run as\n"
241                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
242                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
243                 "-y <remote:port>\tAddress of the NetFlow collector\n",
244                 "-f <writable file>\tFile to write data into\n"
245                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
246         exit(0);
247 }
248
249 #if ((DEBUG) & DEBUG_I)
250 void info_debug()
251 {
252         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
253                 pkts_total, pkts_total_fragmented, size_total,
254                 pkts_pending - pkts_pending_done, pending_queue_trace);
255         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
256                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
257         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
258                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
259         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
260                 total_elements, free_elements, total_memory);
261 }
262 #endif
263
264 void sighandler(int sig)
265 {
266         switch (sig) {
267                 case SIGTERM:
268                         sigs |= SIGTERM_MASK;
269                         break;
270 #if ((DEBUG) & DEBUG_I)
271                 case SIGUSR1:
272                         sigs |= SIGUSR1_MASK;
273                         break;
274 #endif
275         }
276 }
277
278 void gettime(struct Time *now)
279 {
280         struct timeval t;
281
282         gettimeofday(&t, 0);
283         now->sec = t.tv_sec;
284         now->usec = t.tv_usec;
285 }
286
287 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
288 {
289         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
290 }
291
292 /* Uptime in miliseconds */
293 uint32_t getuptime(struct Time *t)
294 {
295         /* Maximum uptime is about 49/2 days */
296         return cmpmtime(t, &start_time);
297 }
298
299 hash_t hash_flow(struct Flow *flow)
300 {
301         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
302         else return hash(flow, sizeof(struct Flow_TL));
303 }
304
305 uint16_t snmp_index(char *name) {
306         uint32_t i;
307
308         if (!*name) return 0;
309
310         for (i = 0; (int) i < nsnmp_rules; i++) {
311                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
312                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
313         }
314
315         if ((i = if_nametoindex(name))) return i;
316
317         return -1;
318 }
319
320 inline void copy_flow(struct Flow *src, struct Flow *dst)
321 {
322         dst->iif = src->iif;
323         dst->oif = src->oif;
324         dst->sip = src->sip;
325         dst->dip = src->dip;
326         dst->tos = src->tos;
327         dst->proto = src->proto;
328         dst->tcp_flags = src->tcp_flags;
329         dst->id = src->id;
330         dst->sp = src->sp;
331         dst->dp = src->dp;
332         dst->pkts = src->pkts;
333         dst->size = src->size;
334         dst->sizeF = src->sizeF;
335         dst->sizeP = src->sizeP;
336         dst->ctime = src->ctime;
337         dst->mtime = src->mtime;
338         dst->flags = src->flags;
339 }
340
341 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
342 {
343         struct Flow **flowpp;
344
345 #ifdef WALL
346         flowpp = 0;
347 #endif
348
349         if (prev) flowpp = *prev;
350
351         while (where) {
352                 if (where->sip.s_addr == what->sip.s_addr
353                         && where->dip.s_addr == what->dip.s_addr
354                         && where->proto == what->proto) {
355                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
356                                 case 0:
357                                         /* Both unfragmented */
358                                         if ((what->sp == where->sp)
359                                                 && (what->dp == where->dp)) goto done;
360                                         break;
361                                 case 2:
362                                         /* Both fragmented */
363                                         if (where->id == what->id) goto done;
364                                         break;
365                         }
366                 }
367                 flowpp = &where->next;
368                 where = where->next;
369         }
370 done:
371         if (prev) *prev = flowpp;
372         return where;
373 }
374
375 int put_into(struct Flow *flow, int flag
376 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
377         , char *logbuf
378 #endif
379 )
380 {
381         int ret = 0;
382         hash_t h;
383         struct Flow *flown, **flowpp;
384 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
385         char buf[64];
386 #endif
387
388         h = hash_flow(flow);
389 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
390         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
391         strcat(logbuf, buf);
392 #endif
393         pthread_mutex_lock(&flows_mutex[h]);
394         flowpp = &flows[h];
395         if (!(flown = find(flows[h], flow, &flowpp))) {
396                 /* No suitable flow found - add */
397                 if (flag == COPY_INTO) {
398                         if ((flown = mem_alloc())) {
399                                 copy_flow(flow, flown);
400                                 flow = flown;
401                         } else {
402 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
403                                 my_log(LOG_ERR, "%s %s. %s",
404                                         "mem_alloc():", strerror(errno), "packet lost");
405 #endif
406                                 return -1;
407                         }
408                 }
409                 flow->next = flows[h];
410                 flows[h] = flow;
411 #if ((DEBUG) & DEBUG_I)
412                 flows_total++;
413                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
414 #endif
415 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
416                 if (flown) {
417                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
418                         strcat(logbuf, buf);
419                 }
420 #endif
421         } else {
422                 /* Found suitable flow - update */
423 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
424                 sprintf(buf, " +> %x", (unsigned) flown);
425                 strcat(logbuf, buf);
426 #endif
427                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
428                         flown->mtime = flow->mtime;
429                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
430                         flown->ctime = flow->ctime;
431                 flown->tcp_flags |= flow->tcp_flags;
432                 flown->size += flow->size;
433                 flown->pkts += flow->pkts;
434                 if (flow->flags & FLOW_FRAG) {
435                         /* Fragmented flow require some additional work */
436                         if (flow->flags & FLOW_TL) {
437                                 /*
438                                 ?FIXME?
439                                 Several packets with FLOW_TL (attack)
440                                 */
441                                 flown->sp = flow->sp;
442                                 flown->dp = flow->dp;
443                         }
444                         if (flow->flags & FLOW_LASTFRAG) {
445                                 /*
446                                 ?FIXME?
447                                 Several packets with FLOW_LASTFRAG (attack)
448                                 */
449                                 flown->sizeP = flow->sizeP;
450                         }
451                         flown->flags |= flow->flags;
452                         flown->sizeF += flow->sizeF;
453                         if ((flown->flags & FLOW_LASTFRAG)
454                                 && (flown->sizeF >= flown->sizeP)) {
455                                 /* All fragments received - flow reassembled */
456                                 *flowpp = flown->next;
457                                 pthread_mutex_unlock(&flows_mutex[h]);
458 #if ((DEBUG) & DEBUG_I)
459                                 flows_total--;
460                                 flows_fragmented--;
461 #endif
462                                 flown->id = 0;
463                                 flown->flags &= ~FLOW_FRAG;
464 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
465                                 strcat(logbuf," R");
466 #endif
467                                 ret = put_into(flown, MOVE_INTO
468 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
469                                                 , logbuf
470 #endif
471                                         );
472                         }
473                 }
474                 if (flag == MOVE_INTO) mem_free(flow);
475         }
476         pthread_mutex_unlock(&flows_mutex[h]);
477         return ret;
478 }
479
480 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
481 {
482         int i;
483
484         for (i = 0; i < fields; i++) {
485 #if ((DEBUG) & DEBUG_F)
486                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
487 #endif
488                 switch (format[i]) {
489                         case NETFLOW_IPV4_SRC_ADDR:
490                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
491                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
492                                 break;
493
494                         case NETFLOW_IPV4_DST_ADDR:
495                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
496                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
497                                 break;
498
499                         case NETFLOW_INPUT_SNMP:
500                                 *((uint16_t *) p) = htons(flow->iif);
501                                 p += NETFLOW_INPUT_SNMP_SIZE;
502                                 break;
503
504                         case NETFLOW_OUTPUT_SNMP:
505                                 *((uint16_t *) p) = htons(flow->oif);
506                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
507                                 break;
508
509                         case NETFLOW_PKTS_32:
510                                 *((uint32_t *) p) = htonl(flow->pkts);
511                                 p += NETFLOW_PKTS_32_SIZE;
512                                 break;
513
514                         case NETFLOW_BYTES_32:
515                                 *((uint32_t *) p) = htonl(flow->size);
516                                 p += NETFLOW_BYTES_32_SIZE;
517                                 break;
518
519                         case NETFLOW_FIRST_SWITCHED:
520                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
521                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
522                                 break;
523
524                         case NETFLOW_LAST_SWITCHED:
525                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
526                                 p += NETFLOW_LAST_SWITCHED_SIZE;
527                                 break;
528
529                         case NETFLOW_L4_SRC_PORT:
530                                 *((uint16_t *) p) = flow->sp;
531                                 p += NETFLOW_L4_SRC_PORT_SIZE;
532                                 break;
533
534                         case NETFLOW_L4_DST_PORT:
535                                 *((uint16_t *) p) = flow->dp;
536                                 p += NETFLOW_L4_DST_PORT_SIZE;
537                                 break;
538
539                         case NETFLOW_PROT:
540                                 *((uint8_t *) p) = flow->proto;
541                                 p += NETFLOW_PROT_SIZE;
542                                 break;
543
544                         case NETFLOW_SRC_TOS:
545                                 *((uint8_t *) p) = flow->tos;
546                                 p += NETFLOW_SRC_TOS_SIZE;
547                                 break;
548
549                         case NETFLOW_TCP_FLAGS:
550                                 *((uint8_t *) p) = flow->tcp_flags;
551                                 p += NETFLOW_TCP_FLAGS_SIZE;
552                                 break;
553
554                         case NETFLOW_VERSION:
555                                 *((uint16_t *) p) = htons(netflow->Version);
556                                 p += NETFLOW_VERSION_SIZE;
557                                 break;
558
559                         case NETFLOW_COUNT:
560                                 *((uint16_t *) p) = htons(emit_count);
561                                 p += NETFLOW_COUNT_SIZE;
562                                 break;
563
564                         case NETFLOW_UPTIME:
565                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
566                                 p += NETFLOW_UPTIME_SIZE;
567                                 break;
568
569                         case NETFLOW_UNIX_SECS:
570                                 *((uint32_t *) p) = htonl(emit_time.sec);
571                                 p += NETFLOW_UNIX_SECS_SIZE;
572                                 break;
573
574                         case NETFLOW_UNIX_NSECS:
575                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
576                                 p += NETFLOW_UNIX_NSECS_SIZE;
577                                 break;
578
579                         case NETFLOW_FLOW_SEQUENCE:
580                                 //*((uint32_t *) p) = htonl(emit_sequence);
581                                 *((uint32_t *) p) = 0;
582                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
583                                 break;
584
585                         case NETFLOW_PAD8:
586                         /* Unsupported (uint8_t) */
587                         case NETFLOW_ENGINE_TYPE:
588                         case NETFLOW_ENGINE_ID:
589                         case NETFLOW_FLAGS7_1:
590                         case NETFLOW_SRC_MASK:
591                         case NETFLOW_DST_MASK:
592                                 *((uint8_t *) p) = 0;
593                                 p += NETFLOW_PAD8_SIZE;
594                                 break;
595
596                         case NETFLOW_PAD16:
597                         /* Unsupported (uint16_t) */
598                         case NETFLOW_SRC_AS:
599                         case NETFLOW_DST_AS:
600                         case NETFLOW_FLAGS7_2:
601                                 *((uint16_t *) p) = 0;
602                                 p += NETFLOW_PAD16_SIZE;
603                                 break;
604
605                         case NETFLOW_PAD32:
606                         /* Unsupported (uint32_t) */
607                         case NETFLOW_IPV4_NEXT_HOP:
608                         case NETFLOW_ROUTER_SC:
609                                 *((uint32_t *) p) = 0;
610                                 p += NETFLOW_PAD32_SIZE;
611                                 break;
612
613                         default:
614                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
615                                         format, i, format[i]);
616                                 exit(1);
617                 }
618         }
619 #if ((DEBUG) & DEBUG_F)
620         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
621 #endif
622         return p;
623 }
624
625 void setuser() {
626         /*
627         Workaround for clone()-based threads
628         Try to change EUID independently of main thread
629         */
630         if (pw) {
631                 setgroups(0, NULL);
632                 setregid(pw->pw_gid, pw->pw_gid);
633                 setreuid(pw->pw_uid, pw->pw_uid);
634         }
635 }
636
637 void *emit_thread()
638 {
639         struct Flow *flow;
640         void *p;
641         struct timeval now;
642         struct timespec timeout;
643         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
644
645         p = (void *) &emit_packet + netflow->HeaderSize;
646         timeout.tv_nsec = 0;
647
648         setuser();
649
650         for (;;) {
651                 pthread_mutex_lock(&emit_mutex);
652                 while (!flows_emit) {
653                         gettimeofday(&now, 0);
654                         timeout.tv_sec = now.tv_sec + emit_timeout;
655                         /* Do not wait until emit_packet will filled - it may be too long */
656                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
657                                 pthread_mutex_unlock(&emit_mutex);
658                                 goto sendit;
659                         }
660                 }
661                 flow = flows_emit;
662                 flows_emit = flows_emit->next;
663 #if ((DEBUG) & DEBUG_I)
664                 emit_queue--;
665 #endif          
666                 pthread_mutex_unlock(&emit_mutex);
667
668 #ifdef UPTIME_TRICK
669                 if (!emit_count) {
670                         gettime(&start_time);
671                         start_time.sec -= start_time_offset;
672                 }
673 #endif
674                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
675                 mem_free(flow);
676                 emit_count++;
677                 if (emit_count == netflow->MaxFlows) {
678                 sendit:
679                         gettime(&emit_time);
680                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
681                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
682                         peer_rot_cur = 0;
683                         for (i = 0; i < npeers; i++) {
684                                 if (peers[i].type == PEER_FILE) {
685                                         printf( "Here\n");
686                                                 if (netflow->SeqOffset)
687                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
688 #define MESSAGES
689                                                 ret = write(peers[i].write_fd, emit_packet, size);
690                                                 if (ret < size) {
691 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
692                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
693                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
694 #endif
695 #undef MESSAGES
696                                                 }
697 #if ((DEBUG) & DEBUG_E)
698                                                 commaneelse {
699                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
700                                                                 emit_count, i + 1, peers[i].seq);
701                                                 }
702 #endif
703                                                 peers[i].seq += emit_count;
704
705                                                 /* Rate limit */
706                                                 if (emit_rate_bytes) {
707                                                         sent += size;
708                                                         delay = sent / emit_rate_bytes;
709                                                         if (delay) {
710                                                                 sent %= emit_rate_bytes;
711                                                                 timeout.tv_sec = 0;
712                                                                 timeout.tv_nsec = emit_rate_delay * delay;
713                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
714                                                         }
715                                                 }
716                                         }
717                                 else
718                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
719                                 else
720                                 if (peers[i].type == PEER_ROTATE) 
721                                         if (peer_rot_cur++ == peer_rot_work) {
722                                         sendreal:
723                                                 if (netflow->SeqOffset)
724                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
725                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
726                                                 if (ret < size) {
727 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
728                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
729                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
730 #endif
731                                                 }
732 #if ((DEBUG) & DEBUG_E)
733                                                 commaneelse {
734                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
735                                                                 emit_count, i + 1, peers[i].seq);
736                                                 }
737 #endif
738                                                 peers[i].seq += emit_count;
739
740                                                 /* Rate limit */
741                                                 if (emit_rate_bytes) {
742                                                         sent += size;
743                                                         delay = sent / emit_rate_bytes;
744                                                         if (delay) {
745                                                                 sent %= emit_rate_bytes;
746                                                                 timeout.tv_sec = 0;
747                                                                 timeout.tv_nsec = emit_rate_delay * delay;
748                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
749                                                         }
750                                                 }
751                                         }
752                         }
753                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
754                         emit_sequence += emit_count;
755                         emit_count = 0;
756 #if ((DEBUG) & DEBUG_I)
757                         emit_pkts++;
758 #endif
759                 }
760         }
761 }       
762
763 void *unpending_thread()
764 {
765         struct timeval now;
766         struct timespec timeout;
767 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
768         char logbuf[256];
769 #endif
770
771         setuser();
772
773         timeout.tv_nsec = 0;
774         pthread_mutex_lock(&unpending_mutex);
775
776         for (;;) {
777                 while (!(pending_tail->flags & FLOW_PENDING)) {
778                         gettimeofday(&now, 0);
779                         timeout.tv_sec = now.tv_sec + unpending_timeout;
780                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
781                 }
782
783 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
784                 *logbuf = 0;
785 #endif
786                 if (put_into(pending_tail, COPY_INTO
787 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
788                                 , logbuf
789 #endif
790                         ) < 0) {
791 #if ((DEBUG) & DEBUG_I)
792                         pkts_lost_unpending++;
793 #endif                          
794                 }
795
796 #if ((DEBUG) & DEBUG_U)
797                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
798 #endif
799
800                 pending_tail->flags = 0;
801                 pending_tail = pending_tail->next;
802 #if ((DEBUG) & DEBUG_I)
803                 pkts_pending_done++;
804 #endif
805         }
806 }
807
808 void *scan_thread()
809 {
810 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
811         char logbuf[256];
812 #endif
813         int i;
814         struct Flow *flow, **flowpp;
815         struct Time now;
816         struct timespec timeout;
817
818         setuser();
819
820         timeout.tv_nsec = 0;
821         pthread_mutex_lock(&scan_mutex);
822
823         for (;;) {
824                 gettime(&now);
825                 timeout.tv_sec = now.sec + scan_interval;
826                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
827
828                 gettime(&now);
829 #if ((DEBUG) & DEBUG_S)
830                 my_log(LOG_DEBUG, "S: %d", now.sec);
831 #endif
832                 for (i = 0; i < 1 << HASH_BITS ; i++) {
833                         pthread_mutex_lock(&flows_mutex[i]);
834                         flow = flows[i];
835                         flowpp = &flows[i];
836                         while (flow) {
837                                 if (flow->flags & FLOW_FRAG) {
838                                         /* Process fragmented flow */
839                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
840                                                 /* Fragmented flow expired - put it into special chain */
841 #if ((DEBUG) & DEBUG_I)
842                                                 flows_fragmented--;
843                                                 flows_total--;
844 #endif
845                                                 *flowpp = flow->next;
846                                                 flow->id = 0;
847                                                 flow->flags &= ~FLOW_FRAG;
848                                                 flow->next = scan_frag_dreg;
849                                                 scan_frag_dreg = flow;
850                                                 flow = *flowpp;
851                                                 continue;
852                                         }
853                                 } else {
854                                         /* Flow is not frgamented */
855                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
856                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
857                                                 /* Flow expired */
858 #if ((DEBUG) & DEBUG_S)
859                                                 my_log(LOG_DEBUG, "S: E %x", flow);
860 #endif
861 #if ((DEBUG) & DEBUG_I)
862                                                 flows_total--;
863 #endif
864                                                 *flowpp = flow->next;
865                                                 pthread_mutex_lock(&emit_mutex);
866                                                 flow->next = flows_emit;
867                                                 flows_emit = flow;
868 #if ((DEBUG) & DEBUG_I)
869                                                 emit_queue++;
870 #endif                          
871                                                 pthread_mutex_unlock(&emit_mutex);
872                                                 flow = *flowpp;
873                                                 continue;
874                                         }
875                                 }
876                                 flowpp = &flow->next;
877                                 flow = flow->next;
878                         } /* chain loop */
879                         pthread_mutex_unlock(&flows_mutex[i]);
880                 } /* hash loop */
881                 if (flows_emit) pthread_cond_signal(&emit_cond);
882
883                 while (scan_frag_dreg) {
884                         flow = scan_frag_dreg;
885                         scan_frag_dreg = flow->next;
886 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
887                         *logbuf = 0;
888 #endif
889                         put_into(flow, MOVE_INTO
890 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
891                                 , logbuf
892 #endif
893                         );
894 #if ((DEBUG) & DEBUG_S)
895                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
896 #endif
897                 }
898         }
899 }
900
901 void *cap_thread()
902 {
903         struct ulog_packet_msg *ulog_msg;
904         struct ip *nl;
905         void *tl;
906         struct Flow *flow;
907         int len, off_frag, psize;
908 #if ((DEBUG) & DEBUG_C)
909         char buf[64];
910         char logbuf[256];
911 #endif
912
913         setuser();
914
915         while (!killed) {
916                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
917                 if (len <= 0) {
918                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
919                         continue;
920                 }
921                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
922
923 #if ((DEBUG) & DEBUG_C)
924                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
925 #endif
926
927                         nl = (void *) &ulog_msg->payload;
928                         psize = ulog_msg->data_len;
929
930                         /* Sanity check */
931                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
932 #if ((DEBUG) & DEBUG_C)
933                                 strcat(logbuf, " U");
934                                 my_log(LOG_DEBUG, "%s", logbuf);
935 #endif
936 #if ((DEBUG) & DEBUG_I)
937                                 pkts_ignored++;
938 #endif
939                                 continue;
940                         }
941
942                         if (pending_head->flags) {
943 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
944                                 my_log(LOG_ERR,
945 # if ((DEBUG) & DEBUG_C)
946                                         "%s %s %s", logbuf,
947 # else
948                                         "%s %s",
949 # endif
950                                         "pending queue full:", "packet lost");
951 #endif
952 #if ((DEBUG) & DEBUG_I)
953                                 pkts_lost_capture++;
954 #endif
955                                 goto done;
956                         }
957
958 #if ((DEBUG) & DEBUG_I)
959                         pkts_total++;
960 #endif
961
962                         flow = pending_head;
963
964                         /* ?FIXME? Add sanity check for ip_len? */
965                         flow->size = ntohs(nl->ip_len);
966 #if ((DEBUG) & DEBUG_I)
967                         size_total += flow->size;
968 #endif
969
970                         flow->sip = nl->ip_src;
971                         flow->dip = nl->ip_dst;
972                         flow->iif = snmp_index(ulog_msg->indev_name);
973                         flow->oif = snmp_index(ulog_msg->outdev_name);
974                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
975                         flow->proto = nl->ip_p;
976                         flow->id = 0;
977                         flow->tcp_flags = 0;
978                         flow->pkts = 1;
979                         flow->sizeF = 0;
980                         flow->sizeP = 0;
981                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
982                         if (ulog_msg->timestamp_sec) {
983                                 flow->ctime.sec = ulog_msg->timestamp_sec;
984                                 flow->ctime.usec = ulog_msg->timestamp_usec;
985                         } else gettime(&flow->ctime);
986                         flow->mtime = flow->ctime;
987
988                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
989
990                         /*
991                         Offset (from network layer) to transport layer header/IP data
992                         IOW IP header size ;-)
993
994                         ?FIXME?
995                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
996                         */
997                         off_tl = nl->ip_hl << 2;
998                         tl = (void *) nl + off_tl;
999
1000                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1001                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1002                         psize -= off_tl;
1003                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1004                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1005
1006                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1007                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1008 #if ((DEBUG) & DEBUG_C)
1009                                 strcat(logbuf, " F");
1010 #endif
1011 #if ((DEBUG) & DEBUG_I)
1012                                 pkts_total_fragmented++;
1013 #endif
1014                                 flow->flags |= FLOW_FRAG;
1015                                 flow->id = nl->ip_id;
1016
1017                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1018                                         /* Packet whith IP_MF contains information about whole datagram size */
1019                                         flow->flags |= FLOW_LASTFRAG;
1020                                         /* size = frag_offset*8 + data_size */
1021                                         flow->sizeP = off_frag + flow->sizeF;
1022                                 }
1023                         }
1024
1025 #if ((DEBUG) & DEBUG_C)
1026                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1027                         strcat(logbuf, buf);
1028                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1029                         strcat(logbuf, buf);
1030 #endif
1031
1032                         /*
1033                         Fortunately most interesting transport layer information fit
1034                         into first 8 bytes of IP data field (minimal nonzero size).
1035                         Thus we don't need actual packet reassembling to build whole
1036                         transport layer data. We only check the fragment offset for
1037                         zero value to find packet with this information.
1038                         */
1039                         if (!off_frag && psize >= 8) {
1040                                 switch (flow->proto) {
1041                                         case IPPROTO_TCP:
1042                                         case IPPROTO_UDP:
1043                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1044                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1045                                                 goto tl_known;
1046
1047 #ifdef ICMP_TRICK
1048                                         case IPPROTO_ICMP:
1049                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1050                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1051                                                 goto tl_known;
1052 #endif
1053 #ifdef ICMP_TRICK_CISCO
1054                                         case IPPROTO_ICMP:
1055                                                 flow->dp = *((int32_t *) tl);
1056                                                 goto tl_known;
1057 #endif
1058
1059                                         default:
1060                                                 /* Unknown transport layer */
1061 #if ((DEBUG) & DEBUG_C)
1062                                                 strcat(logbuf, " U");
1063 #endif
1064                                                 flow->sp = 0;
1065                                                 flow->dp = 0;
1066                                                 break;
1067
1068                                         tl_known:
1069 #if ((DEBUG) & DEBUG_C)
1070                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1071                                                 strcat(logbuf, buf);
1072 #endif
1073                                                 flow->flags |= FLOW_TL;
1074                                 }
1075                         }
1076
1077                         /* Check for tcp flags presence (including CWR and ECE). */
1078                         if (flow->proto == IPPROTO_TCP
1079                                 && off_frag < 16
1080                                 && psize >= 16 - off_frag) {
1081                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1082 #if ((DEBUG) & DEBUG_C)
1083                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1084                                 strcat(logbuf, buf);
1085 #endif
1086                         }
1087
1088 #if ((DEBUG) & DEBUG_C)
1089                         sprintf(buf, " => %x", (unsigned) flow);
1090                         strcat(logbuf, buf);
1091                         my_log(LOG_DEBUG, "%s", logbuf);
1092 #endif
1093
1094 #if ((DEBUG) & DEBUG_I)
1095                         pkts_pending++;
1096                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1097                         if (pending_queue_trace < pending_queue_trace_candidate)
1098                                 pending_queue_trace = pending_queue_trace_candidate;
1099 #endif
1100
1101                         /* Flow complete - inform unpending_thread() about it */
1102                         pending_head->flags |= FLOW_PENDING;
1103                         pending_head = pending_head->next;
1104                 done:
1105                         pthread_cond_signal(&unpending_cond);
1106                 }
1107         }
1108         return 0;
1109 }
1110
1111 int main(int argc, char **argv)
1112 {
1113         char errpbuf[512];
1114         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1115         int c, i, write_fd, memory_limit = 0;
1116         struct addrinfo hints, *res;
1117         struct sockaddr_in saddr;
1118         pthread_attr_t tattr;
1119         struct sigaction sigact;
1120         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1121         struct timeval timeout;
1122
1123         sched_min = sched_get_priority_min(SCHED);
1124         sched_max = sched_get_priority_max(SCHED);
1125
1126         memset(&saddr, 0 , sizeof(saddr));
1127         memset(&hints, 0 , sizeof(hints));
1128         hints.ai_flags = AI_PASSIVE;
1129         hints.ai_family = AF_INET;
1130         hints.ai_socktype = SOCK_DGRAM;
1131
1132         /* Process command line options */
1133
1134         opterr = 0;
1135         while ((c = my_getopt(argc, argv, parms)) != -1) {
1136                 switch (c) {
1137                         case '?':
1138                                 usage();
1139
1140                         case 'h':
1141                                 usage();
1142                 }
1143         }
1144
1145         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1146         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1147         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1148         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1149         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1150         if (parms[nflag].count) {
1151                 switch (atoi(parms[nflag].arg)) {
1152                         case 1:
1153                                 netflow = &NetFlow1;
1154                                 break;
1155
1156                         case 5:
1157                                 break;
1158
1159                         case 7:
1160                                 netflow = &NetFlow7;
1161                                 break;
1162
1163                         default:
1164                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1165                                 exit(1);
1166                 }
1167         }
1168         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1169         if (parms[lflag].count) {
1170                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1171                         *log_suffix++ = 0;
1172                         if (*log_suffix) {
1173                                 sprintf(errpbuf, "[%s]", log_suffix);
1174                                 strcat(ident, errpbuf);
1175                         }
1176                 }
1177                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1178                 if (log_suffix) *--log_suffix = ':';
1179         }
1180         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1181         err_malloc:
1182                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1183                 exit(1);
1184         }
1185         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1186         if (parms[qflag].count) {
1187                 pending_queue_length = atoi(parms[qflag].arg);
1188                 if (pending_queue_length < 1) {
1189                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1190                         exit(1);
1191                 }
1192         }
1193         if (parms[rflag].count) {
1194                 schedp.sched_priority = atoi(parms[rflag].arg);
1195                 if (schedp.sched_priority
1196                         && (schedp.sched_priority < sched_min
1197                                 || schedp.sched_priority > sched_max)) {
1198                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1199                         exit(1);
1200                 }
1201         }
1202         if (parms[Bflag].count) {
1203                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1204         }
1205         if (parms[bflag].count) {
1206                 bulk_quantity = atoi(parms[bflag].arg);
1207                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1208                         fprintf(stderr, "Illegal %s\n", "bulk size");
1209                         exit(1);
1210                 }
1211         }
1212         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1213         if (parms[Xflag].count) {
1214                 for(i = 0; parms[Xflag].arg[i]; i++)
1215                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1216                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1217                         goto err_malloc;
1218                 rule = strtok(parms[Xflag].arg, ":");
1219                 for (i = 0; rule; i++) {
1220                         snmp_rules[i].len = strlen(rule);
1221                         if (snmp_rules[i].len > IFNAMSIZ) {
1222                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1223                                 exit(1);
1224                         }
1225                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1226                         if (!*(rule - 1)) *(rule - 1) = ',';
1227                         rule = strtok(NULL, ",");
1228                         if (!rule) {
1229                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1230                                 exit(1);
1231                         }
1232                         snmp_rules[i].base = atoi(rule);
1233                         *(rule - 1) = ':';
1234                         rule = strtok(NULL, ":");
1235                 }
1236                 nsnmp_rules = i;
1237         }
1238         if (parms[tflag].count)
1239                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1240         if (parms[aflag].count) {
1241                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1242                 bad_lhost:
1243                         fprintf(stderr, "Illegal %s\n", "source address");
1244                         exit(1);
1245                 } else {
1246                         saddr = *((struct sockaddr_in *) res->ai_addr);
1247                         freeaddrinfo(res);
1248                 }
1249         }
1250         if (parms[uflag].count) 
1251                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1252                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1253                         exit(1);
1254                 }
1255
1256
1257         /* Process collectors parameters. Brrrr... :-[ */
1258
1259         npeers = argc - optind;
1260         if (npeers > 1) {
1261                 /* Send to remote Netflow collector */
1262                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1263                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1264                         dhost = argv[i];
1265                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1266                         *dport++ = 0;
1267                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1268                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1269                                 exit(1);
1270                         }
1271                         peers[npeers].write_fd = write_fd;
1272                         peers[npeers].type = PEER_MIRROR;
1273                         peers[npeers].laddr = saddr;
1274                         peers[npeers].seq = 0;
1275                         if ((lhost = strchr(dport, '/'))) {
1276                                 *lhost++ = 0;
1277                                 if ((type = strchr(lhost, '/'))) {
1278                                         *type++ = 0;
1279                                         switch (*type) {
1280                                                 case 0:
1281                                                 case 'm':
1282                                                         break;
1283
1284                                                 case 'r':
1285                                                         peers[npeers].type = PEER_ROTATE;
1286                                                         npeers_rot++;
1287                                                         break;
1288
1289                                                 default:
1290                                                         goto bad_collector;
1291                                         }
1292                                 }
1293                                 if (*lhost) {
1294                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1295                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1296                                         freeaddrinfo(res);
1297                                 }
1298                         }
1299                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1300                                                 sizeof(struct sockaddr_in))) {
1301                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1302                                 exit(1);
1303                         }
1304                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1305 bad_collector:
1306                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1307                                 exit(1);
1308                         }
1309                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1310                         freeaddrinfo(res);
1311                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1312                                                 sizeof(struct sockaddr_in))) {
1313                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1314                                 exit(1);
1315                         }
1316
1317                         /* Restore command line */
1318                         if (type) *--type = '/';
1319                         if (lhost) *--lhost = '/';
1320                         *--dport = ':';
1321                 }
1322         }
1323         else if (parms[fflag].count) {
1324                 // log into a file
1325                 char *fname;
1326                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1327                 fname = parms[fflag].arg;
1328                 if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
1329                         fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
1330                         exit(1);
1331                 }
1332                 peers[0].write_fd = write_fd;
1333                 peers[0].type = PEER_FILE;
1334                 peers[0].seq = 0;
1335                 npeers++;
1336         }
1337         else 
1338                 usage();
1339
1340
1341         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1342         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1343         if (!ulog_handle) {
1344                 fprintf(stderr, "libipulog initialization error: %s",
1345                         ipulog_strerror(ipulog_errno));
1346                 exit(1);
1347         }
1348         if (sockbufsize)
1349                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1350                         &sockbufsize, sizeof(sockbufsize)) < 0)
1351                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1352
1353         /* Daemonize (if log destination stdout-free) */
1354
1355         my_log_open(ident, verbosity, log_dest);
1356         if (!(log_dest & 2)) {
1357                 switch (fork()) {
1358                         case -1:
1359                                 fprintf(stderr, "fork(): %s", strerror(errno));
1360                                 exit(1);
1361
1362                         case 0:
1363                                 setsid();
1364                                 freopen("/dev/null", "r", stdin);
1365                                 freopen("/dev/null", "w", stdout);
1366                                 freopen("/dev/null", "w", stderr);
1367                                 break;
1368
1369                         default:
1370                                 exit(0);
1371                 }
1372         } else {
1373                 setvbuf(stdout, (char *)0, _IONBF, 0);
1374                 setvbuf(stderr, (char *)0, _IONBF, 0);
1375         }
1376
1377         pid = getpid();
1378         sprintf(errpbuf, "[%ld]", (long) pid);
1379         strcat(ident, errpbuf);
1380
1381         /* Initialization */
1382
1383         hash_init(); /* Actually for crc16 only */
1384         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1385         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1386
1387 #ifdef UPTIME_TRICK
1388         /* Hope 12 days is enough :-/ */
1389         start_time_offset = 1 << 20;
1390
1391         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1392 #endif
1393         gettime(&start_time);
1394
1395         /*
1396         Build static pending queue as circular buffer.
1397         */
1398         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1399         pending_tail = pending_head;
1400         for (i = pending_queue_length - 1; i--;) {
1401                 if (!(pending_tail->next = mem_alloc())) {
1402                 err_mem_alloc:
1403                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1404                         exit(1);
1405                 }
1406                 pending_tail = pending_tail->next;
1407         }
1408         pending_tail->next = pending_head;
1409         pending_tail = pending_head;
1410
1411         sigemptyset(&sig_mask);
1412         sigact.sa_handler = &sighandler;
1413         sigact.sa_mask = sig_mask;
1414         sigact.sa_flags = 0;
1415         sigaddset(&sig_mask, SIGTERM);
1416         sigaction(SIGTERM, &sigact, 0);
1417 #if ((DEBUG) & DEBUG_I)
1418         sigaddset(&sig_mask, SIGUSR1);
1419         sigaction(SIGUSR1, &sigact, 0);
1420 #endif
1421         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1422                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1423                 exit(1);
1424         }
1425
1426         my_log(LOG_INFO, "Starting %s...", VERSION);
1427
1428         if (parms[cflag].count) {
1429                 if (chdir(parms[cflag].arg) || chroot(".")) {
1430                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1431                         exit(1);
1432                 }
1433         }
1434
1435         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1436         pthread_attr_init(&tattr);
1437         for (i = 0; i < THREADS - 1; i++) {
1438                 if (schedp.sched_priority > 0) {
1439                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1440                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1441                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1442                                 exit(1);
1443                         }
1444                 }
1445                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1446                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1447                         exit(1);
1448                 }
1449                 pthread_detach(thid);
1450                 schedp.sched_priority++;
1451         }
1452
1453         if (pw) {
1454                 if (setgroups(0, NULL)) {
1455                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1456                         exit(1);
1457                 }
1458                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1459                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1460                         exit(1);
1461                 }
1462                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1463                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1464                         exit(1);
1465                 }
1466         }
1467
1468         if (!(pidfile = fopen(pidfilepath, "w")))
1469                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1470         else {
1471                 fprintf(pidfile, "%ld\n", (long) pid);
1472                 fclose(pidfile);
1473         }
1474
1475         my_log(LOG_INFO, "pid: %d", pid);
1476         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1477                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1478                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1479                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1480                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1481                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1482                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1483         for (i = 0; i < nsnmp_rules; i++) {
1484                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1485                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1486         }
1487         for (i = 0; i < npeers; i++) {
1488                 switch (peers[i].type) {
1489                         case PEER_MIRROR:
1490                                 c = 'm';
1491                                 break;
1492                         case PEER_ROTATE:
1493                                 c = 'r';
1494                                 break;
1495                 }
1496                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1497                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1498                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1499         }
1500
1501         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1502
1503         timeout.tv_usec = 0;
1504         while (!killed
1505                 || (total_elements - free_elements - pending_queue_length)
1506                 || emit_count
1507                 || pending_tail->flags) {
1508
1509                 if (!sigs) {
1510                         timeout.tv_sec = scan_interval;
1511                         select(0, 0, 0, 0, &timeout);
1512                 }
1513
1514                 if (sigs & SIGTERM_MASK && !killed) {
1515                         sigs &= ~SIGTERM_MASK;
1516                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1517                         scan_interval = 1;
1518                         frag_lifetime = -1;
1519                         active_lifetime = -1;
1520                         inactive_lifetime = -1;
1521                         emit_timeout = 1;
1522                         unpending_timeout = 1;
1523                         killed = 1;
1524                         pthread_cond_signal(&scan_cond);
1525                         pthread_cond_signal(&unpending_cond);
1526                 }
1527
1528 #if ((DEBUG) & DEBUG_I)
1529                 if (sigs & SIGUSR1_MASK) {
1530                         sigs &= ~SIGUSR1_MASK;
1531                         info_debug();
1532                 }
1533 #endif
1534         }
1535         remove(pidfilepath);
1536 #if ((DEBUG) & DEBUG_I)
1537         info_debug();
1538 #endif
1539         my_log(LOG_INFO, "Done.");
1540 #ifdef WALL
1541         return 0;
1542 #endif
1543 }