Re-import of fprobe-ulog
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8 */
9
10 #include <common.h>
11
12 /* stdout, stderr, freopen() */
13 #include <stdio.h>
14
15 /* atoi(), exit() */
16 #include <stdlib.h>
17
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
19 #include <unistd.h>
20
21 /* strerror() */
22 #include <string.h>
23
24 /* sig*() */
25 #include <signal.h>
26
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
29         int fd;
30         u_int8_t blocking;
31         struct sockaddr_nl local;
32         struct sockaddr_nl peer;
33         struct nlmsghdr* last_nlhdr;
34 };
35
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
46 #include <net/if.h>
47
48 #include <sys/param.h>
49 #include <pwd.h>
50 #ifdef OS_LINUX
51 #include <grp.h>
52 #endif
53
54 /* pthread_*() */
55 #include <pthread.h>
56
57 /* errno */
58 #include <errno.h>
59
60 /* getaddrinfo() */
61 #include <netdb.h>
62
63 /* nanosleep() */
64 #include <time.h>
65
66 /* gettimeofday() */
67 #include <sys/time.h>
68
69 /* scheduling */
70 #include <sched.h>
71
72 /* select() (POSIX)*/
73 #include <sys/select.h>
74
75 /* open() */
76 #include <sys/stat.h>
77 #include <fcntl.h>
78
79 #include <fprobe-ulog.h>
80 #include <my_log.h>
81 #include <my_getopt.h>
82 #include <netflow.h>
83 #include <hash.h>
84 #include <mem.h>
85
86 enum {
87         aflag,
88         Bflag,
89         bflag,
90         cflag,
91         dflag,
92         eflag,
93         fflag,
94         gflag,
95         hflag,
96         lflag,
97         mflag,
98         Mflag,
99         nflag,
100         qflag,
101         rflag,
102         sflag,
103         tflag,
104         Uflag,
105         uflag,
106         vflag,
107         Xflag,
108 };
109
110 static struct getopt_parms parms[] = {
111         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
112         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
113         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
114         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
115         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
116         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
117         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
118         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
119         {'h', 0, 0, 0},
120         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
121         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
122         {'M', 0, 0, 0},
123         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
124         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {0, 0, 0, 0}
133 };
134
135 extern char *optarg;
136 extern int optind, opterr, optopt;
137 extern int errno;
138
139 extern struct NetFlow NetFlow1;
140 extern struct NetFlow NetFlow5;
141 extern struct NetFlow NetFlow7;
142
143 #define mark_is_tos parms[Mflag].count
144 static unsigned scan_interval = 5;
145 static int frag_lifetime = 30;
146 static int inactive_lifetime = 60;
147 static int active_lifetime = 300;
148 static int sockbufsize;
149 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
150 #if (MEM_BITS == 0) || (MEM_BITS == 16)
151 #define BULK_QUANTITY 10000
152 #else
153 #define BULK_QUANTITY 200
154 #endif
155 static unsigned bulk_quantity = BULK_QUANTITY;
156 static unsigned pending_queue_length = 100;
157 static struct NetFlow *netflow = &NetFlow5;
158 static unsigned verbosity = 6;
159 static unsigned log_dest = MY_LOG_SYSLOG;
160 static struct Time start_time;
161 static long start_time_offset;
162 static int off_tl;
163 /* From mem.c */
164 extern unsigned total_elements;
165 extern unsigned free_elements;
166 extern unsigned total_memory;
167 #if ((DEBUG) & DEBUG_I)
168 static unsigned emit_pkts, emit_queue;
169 static uint64_t size_total;
170 static unsigned pkts_total, pkts_total_fragmented;
171 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
172 static unsigned pkts_pending, pkts_pending_done;
173 static unsigned pending_queue_trace, pending_queue_trace_candidate;
174 static unsigned flows_total, flows_fragmented;
175 #endif
176 static unsigned emit_count;
177 static uint32_t emit_sequence;
178 static unsigned emit_rate_bytes, emit_rate_delay;
179 static struct Time emit_time;
180 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
181 static pthread_t thid;
182 static sigset_t sig_mask;
183 static struct sched_param schedp;
184 static int sched_min, sched_max;
185 static int npeers, npeers_rot;
186 static struct peer *peers;
187 static int sigs;
188
189 static struct Flow *flows[1 << HASH_BITS];
190 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
191
192 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
193 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
194
195 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
196 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
197 static struct Flow *pending_head, *pending_tail;
198 static struct Flow *scan_frag_dreg;
199
200 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
202 static struct Flow *flows_emit;
203
204 static char ident[256] = "fprobe-ulog";
205 static FILE *pidfile;
206 static char *pidfilepath;
207 static pid_t pid;
208 static int killed;
209 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
210 static struct ipulog_handle *ulog_handle;
211 static uint32_t ulog_gmask = 1;
212 static char *cap_buf;
213 static int nsnmp_rules;
214 static struct snmp_rule *snmp_rules;
215 static struct passwd *pw = 0;
216
217 void usage()
218 {
219         fprintf(stdout,
220                 "fprobe-ulog: a NetFlow probe. Version %s\n"
221                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
222                 "\n"
223                 "-h\t\tDisplay this help\n"
224                 "-U <mask>\tULOG group bitwise mask [1]\n"
225                 "-s <seconds>\tHow often scan for expired flows [5]\n"
226                 "-g <seconds>\tFragmented flow lifetime [30]\n"
227                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
228                 "-f <filename>\tLog flow data in a file\n"
229                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
230                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
231                 "-a <address>\tUse <address> as source for NetFlow flow\n"
232                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
233                 "-M\t\tUse netfilter mark value as ToS flag\n"
234                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
235                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
236                 "-q <flows>\tPending queue length [100]\n"
237                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
238                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
239                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
240                 "-c <directory>\tDirectory to chroot to\n"
241                 "-u <user>\tUser to run as\n"
242                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
243                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
244                 "-y <remote:port>\tAddress of the NetFlow collector\n",
245                 "-f <writable file>\tFile to write data into\n"
246                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
247         exit(0);
248 }
249
250 #if ((DEBUG) & DEBUG_I)
251 void info_debug()
252 {
253         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
254                 pkts_total, pkts_total_fragmented, size_total,
255                 pkts_pending - pkts_pending_done, pending_queue_trace);
256         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
257                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
258         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
259                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
260         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
261                 total_elements, free_elements, total_memory);
262 }
263 #endif
264
265 void sighandler(int sig)
266 {
267         switch (sig) {
268                 case SIGTERM:
269                         sigs |= SIGTERM_MASK;
270                         break;
271 #if ((DEBUG) & DEBUG_I)
272                 case SIGUSR1:
273                         sigs |= SIGUSR1_MASK;
274                         break;
275 #endif
276         }
277 }
278
279 void gettime(struct Time *now)
280 {
281         struct timeval t;
282
283         gettimeofday(&t, 0);
284         now->sec = t.tv_sec;
285         now->usec = t.tv_usec;
286 }
287
288 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
289 {
290         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
291 }
292
293 /* Uptime in miliseconds */
294 uint32_t getuptime(struct Time *t)
295 {
296         /* Maximum uptime is about 49/2 days */
297         return cmpmtime(t, &start_time);
298 }
299
300 hash_t hash_flow(struct Flow *flow)
301 {
302         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
303         else return hash(flow, sizeof(struct Flow_TL));
304 }
305
306 uint16_t snmp_index(char *name) {
307         uint32_t i;
308
309         if (!*name) return 0;
310
311         for (i = 0; (int) i < nsnmp_rules; i++) {
312                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
313                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
314         }
315
316         if ((i = if_nametoindex(name))) return i;
317
318         return -1;
319 }
320
321 inline void copy_flow(struct Flow *src, struct Flow *dst)
322 {
323         dst->iif = src->iif;
324         dst->oif = src->oif;
325         dst->sip = src->sip;
326         dst->dip = src->dip;
327         dst->tos = src->tos;
328         dst->proto = src->proto;
329         dst->tcp_flags = src->tcp_flags;
330         dst->id = src->id;
331         dst->sp = src->sp;
332         dst->dp = src->dp;
333         dst->pkts = src->pkts;
334         dst->size = src->size;
335         dst->sizeF = src->sizeF;
336         dst->sizeP = src->sizeP;
337         dst->ctime = src->ctime;
338         dst->mtime = src->mtime;
339         dst->flags = src->flags;
340 }
341
342 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
343 {
344         struct Flow **flowpp;
345
346 #ifdef WALL
347         flowpp = 0;
348 #endif
349
350         if (prev) flowpp = *prev;
351
352         while (where) {
353                 if (where->sip.s_addr == what->sip.s_addr
354                         && where->dip.s_addr == what->dip.s_addr
355                         && where->proto == what->proto) {
356                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
357                                 case 0:
358                                         /* Both unfragmented */
359                                         if ((what->sp == where->sp)
360                                                 && (what->dp == where->dp)) goto done;
361                                         break;
362                                 case 2:
363                                         /* Both fragmented */
364                                         if (where->id == what->id) goto done;
365                                         break;
366                         }
367                 }
368                 flowpp = &where->next;
369                 where = where->next;
370         }
371 done:
372         if (prev) *prev = flowpp;
373         return where;
374 }
375
376 int put_into(struct Flow *flow, int flag
377 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
378         , char *logbuf
379 #endif
380 )
381 {
382         int ret = 0;
383         hash_t h;
384         struct Flow *flown, **flowpp;
385 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
386         char buf[64];
387 #endif
388
389         h = hash_flow(flow);
390 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
391         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
392         strcat(logbuf, buf);
393 #endif
394         pthread_mutex_lock(&flows_mutex[h]);
395         flowpp = &flows[h];
396         if (!(flown = find(flows[h], flow, &flowpp))) {
397                 /* No suitable flow found - add */
398                 if (flag == COPY_INTO) {
399                         if ((flown = mem_alloc())) {
400                                 copy_flow(flow, flown);
401                                 flow = flown;
402                         } else {
403 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
404                                 my_log(LOG_ERR, "%s %s. %s",
405                                         "mem_alloc():", strerror(errno), "packet lost");
406 #endif
407                                 return -1;
408                         }
409                 }
410                 flow->next = flows[h];
411                 flows[h] = flow;
412 #if ((DEBUG) & DEBUG_I)
413                 flows_total++;
414                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
415 #endif
416 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
417                 if (flown) {
418                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
419                         strcat(logbuf, buf);
420                 }
421 #endif
422         } else {
423                 /* Found suitable flow - update */
424 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
425                 sprintf(buf, " +> %x", (unsigned) flown);
426                 strcat(logbuf, buf);
427 #endif
428                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
429                         flown->mtime = flow->mtime;
430                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
431                         flown->ctime = flow->ctime;
432                 flown->tcp_flags |= flow->tcp_flags;
433                 flown->size += flow->size;
434                 flown->pkts += flow->pkts;
435                 if (flow->flags & FLOW_FRAG) {
436                         /* Fragmented flow require some additional work */
437                         if (flow->flags & FLOW_TL) {
438                                 /*
439                                 ?FIXME?
440                                 Several packets with FLOW_TL (attack)
441                                 */
442                                 flown->sp = flow->sp;
443                                 flown->dp = flow->dp;
444                         }
445                         if (flow->flags & FLOW_LASTFRAG) {
446                                 /*
447                                 ?FIXME?
448                                 Several packets with FLOW_LASTFRAG (attack)
449                                 */
450                                 flown->sizeP = flow->sizeP;
451                         }
452                         flown->flags |= flow->flags;
453                         flown->sizeF += flow->sizeF;
454                         if ((flown->flags & FLOW_LASTFRAG)
455                                 && (flown->sizeF >= flown->sizeP)) {
456                                 /* All fragments received - flow reassembled */
457                                 *flowpp = flown->next;
458                                 pthread_mutex_unlock(&flows_mutex[h]);
459 #if ((DEBUG) & DEBUG_I)
460                                 flows_total--;
461                                 flows_fragmented--;
462 #endif
463                                 flown->id = 0;
464                                 flown->flags &= ~FLOW_FRAG;
465 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
466                                 strcat(logbuf," R");
467 #endif
468                                 ret = put_into(flown, MOVE_INTO
469 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
470                                                 , logbuf
471 #endif
472                                         );
473                         }
474                 }
475                 if (flag == MOVE_INTO) mem_free(flow);
476         }
477         pthread_mutex_unlock(&flows_mutex[h]);
478         return ret;
479 }
480
481 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
482 {
483         int i;
484
485         for (i = 0; i < fields; i++) {
486 #if ((DEBUG) & DEBUG_F)
487                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
488 #endif
489                 switch (format[i]) {
490                         case NETFLOW_IPV4_SRC_ADDR:
491                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
492                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
493                                 break;
494
495                         case NETFLOW_IPV4_DST_ADDR:
496                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
497                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
498                                 break;
499
500                         case NETFLOW_INPUT_SNMP:
501                                 *((uint16_t *) p) = htons(flow->iif);
502                                 p += NETFLOW_INPUT_SNMP_SIZE;
503                                 break;
504
505                         case NETFLOW_OUTPUT_SNMP:
506                                 *((uint16_t *) p) = htons(flow->oif);
507                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
508                                 break;
509
510                         case NETFLOW_PKTS_32:
511                                 *((uint32_t *) p) = htonl(flow->pkts);
512                                 p += NETFLOW_PKTS_32_SIZE;
513                                 break;
514
515                         case NETFLOW_BYTES_32:
516                                 *((uint32_t *) p) = htonl(flow->size);
517                                 p += NETFLOW_BYTES_32_SIZE;
518                                 break;
519
520                         case NETFLOW_FIRST_SWITCHED:
521                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
522                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
523                                 break;
524
525                         case NETFLOW_LAST_SWITCHED:
526                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
527                                 p += NETFLOW_LAST_SWITCHED_SIZE;
528                                 break;
529
530                         case NETFLOW_L4_SRC_PORT:
531                                 *((uint16_t *) p) = flow->sp;
532                                 p += NETFLOW_L4_SRC_PORT_SIZE;
533                                 break;
534
535                         case NETFLOW_L4_DST_PORT:
536                                 *((uint16_t *) p) = flow->dp;
537                                 p += NETFLOW_L4_DST_PORT_SIZE;
538                                 break;
539
540                         case NETFLOW_PROT:
541                                 *((uint8_t *) p) = flow->proto;
542                                 p += NETFLOW_PROT_SIZE;
543                                 break;
544
545                         case NETFLOW_SRC_TOS:
546                                 *((uint8_t *) p) = flow->tos;
547                                 p += NETFLOW_SRC_TOS_SIZE;
548                                 break;
549
550                         case NETFLOW_TCP_FLAGS:
551                                 *((uint8_t *) p) = flow->tcp_flags;
552                                 p += NETFLOW_TCP_FLAGS_SIZE;
553                                 break;
554
555                         case NETFLOW_VERSION:
556                                 *((uint16_t *) p) = htons(netflow->Version);
557                                 p += NETFLOW_VERSION_SIZE;
558                                 break;
559
560                         case NETFLOW_COUNT:
561                                 *((uint16_t *) p) = htons(emit_count);
562                                 p += NETFLOW_COUNT_SIZE;
563                                 break;
564
565                         case NETFLOW_UPTIME:
566                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
567                                 p += NETFLOW_UPTIME_SIZE;
568                                 break;
569
570                         case NETFLOW_UNIX_SECS:
571                                 *((uint32_t *) p) = htonl(emit_time.sec);
572                                 p += NETFLOW_UNIX_SECS_SIZE;
573                                 break;
574
575                         case NETFLOW_UNIX_NSECS:
576                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
577                                 p += NETFLOW_UNIX_NSECS_SIZE;
578                                 break;
579
580                         case NETFLOW_FLOW_SEQUENCE:
581                                 //*((uint32_t *) p) = htonl(emit_sequence);
582                                 *((uint32_t *) p) = 0;
583                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
584                                 break;
585
586                         case NETFLOW_PAD8:
587                         /* Unsupported (uint8_t) */
588                         case NETFLOW_ENGINE_TYPE:
589                         case NETFLOW_ENGINE_ID:
590                         case NETFLOW_FLAGS7_1:
591                         case NETFLOW_SRC_MASK:
592                         case NETFLOW_DST_MASK:
593                                 *((uint8_t *) p) = 0;
594                                 p += NETFLOW_PAD8_SIZE;
595                                 break;
596
597                         case NETFLOW_PAD16:
598                         /* Unsupported (uint16_t) */
599                         case NETFLOW_SRC_AS:
600                         case NETFLOW_DST_AS:
601                         case NETFLOW_FLAGS7_2:
602                                 *((uint16_t *) p) = 0;
603                                 p += NETFLOW_PAD16_SIZE;
604                                 break;
605
606                         case NETFLOW_PAD32:
607                         /* Unsupported (uint32_t) */
608                         case NETFLOW_IPV4_NEXT_HOP:
609                         case NETFLOW_ROUTER_SC:
610                                 *((uint32_t *) p) = 0;
611                                 p += NETFLOW_PAD32_SIZE;
612                                 break;
613
614                         default:
615                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
616                                         format, i, format[i]);
617                                 exit(1);
618                 }
619         }
620 #if ((DEBUG) & DEBUG_F)
621         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
622 #endif
623         return p;
624 }
625
626 void setuser() {
627         /*
628         Workaround for clone()-based threads
629         Try to change EUID independently of main thread
630         */
631         if (pw) {
632                 setgroups(0, NULL);
633                 setregid(pw->pw_gid, pw->pw_gid);
634                 setreuid(pw->pw_uid, pw->pw_uid);
635         }
636 }
637
638 void *emit_thread()
639 {
640         struct Flow *flow;
641         void *p;
642         struct timeval now;
643         struct timespec timeout;
644         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
645
646         p = (void *) &emit_packet + netflow->HeaderSize;
647         timeout.tv_nsec = 0;
648
649         setuser();
650
651         for (;;) {
652                 pthread_mutex_lock(&emit_mutex);
653                 while (!flows_emit) {
654                         gettimeofday(&now, 0);
655                         timeout.tv_sec = now.tv_sec + emit_timeout;
656                         /* Do not wait until emit_packet will filled - it may be too long */
657                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
658                                 pthread_mutex_unlock(&emit_mutex);
659                                 goto sendit;
660                         }
661                 }
662                 flow = flows_emit;
663                 flows_emit = flows_emit->next;
664 #if ((DEBUG) & DEBUG_I)
665                 emit_queue--;
666 #endif          
667                 pthread_mutex_unlock(&emit_mutex);
668
669 #ifdef UPTIME_TRICK
670                 if (!emit_count) {
671                         gettime(&start_time);
672                         start_time.sec -= start_time_offset;
673                 }
674 #endif
675                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
676                 mem_free(flow);
677                 emit_count++;
678 #ifdef PF2_DEBUG
679                 printf("Emit count = %d\n", emit_count);
680                 fflush(stdout);
681 #endif
682                 if (emit_count == netflow->MaxFlows) {
683                 sendit:
684                         gettime(&emit_time);
685                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
686                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
687                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
688                         if (size < 1464) size = 1464;
689                         peer_rot_cur = 0;
690                         for (i = 0; i < npeers; i++) {
691                                 if (peers[0].type == PEER_FILE) {
692                                                 if (netflow->SeqOffset)
693                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
694 #define MESSAGES
695                                                 ret = write(peers[0].write_fd, emit_packet, size);
696                                                 if (ret < size) {
697 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
698                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
699                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
700 #endif
701 #undef MESSAGES
702                                                 }
703 #if ((DEBUG) & DEBUG_E)
704                                                 commaneelse {
705                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
706                                                                 emit_count, i + 1, peers[i].seq);
707                                                 }
708 #endif
709                                                 peers[0].seq += emit_count;
710
711                                                 /* Rate limit */
712                                                 if (emit_rate_bytes) {
713                                                         sent += size;
714                                                         delay = sent / emit_rate_bytes;
715                                                         if (delay) {
716                                                                 sent %= emit_rate_bytes;
717                                                                 timeout.tv_sec = 0;
718                                                                 timeout.tv_nsec = emit_rate_delay * delay;
719                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
720                                                         }
721                                                 }
722                                         }
723                                 else
724                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
725                                 else
726                                 if (peers[i].type == PEER_ROTATE) 
727                                         if (peer_rot_cur++ == peer_rot_work) {
728                                         sendreal:
729                                                 if (netflow->SeqOffset)
730                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
731                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
732                                                 if (ret < size) {
733 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
734                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
735                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
736 #endif
737                                                 }
738 #if ((DEBUG) & DEBUG_E)
739                                                 commaneelse {
740                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
741                                                                 emit_count, i + 1, peers[i].seq);
742                                                 }
743 #endif
744                                                 peers[i].seq += emit_count;
745
746                                                 /* Rate limit */
747                                                 if (emit_rate_bytes) {
748                                                         sent += size;
749                                                         delay = sent / emit_rate_bytes;
750                                                         if (delay) {
751                                                                 sent %= emit_rate_bytes;
752                                                                 timeout.tv_sec = 0;
753                                                                 timeout.tv_nsec = emit_rate_delay * delay;
754                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
755                                                         }
756                                                 }
757                                         }
758                         }
759                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
760                         emit_sequence += emit_count;
761                         emit_count = 0;
762 #if ((DEBUG) & DEBUG_I)
763                         emit_pkts++;
764 #endif
765                 }
766         }
767 }       
768
769 void *unpending_thread()
770 {
771         struct timeval now;
772         struct timespec timeout;
773 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
774         char logbuf[256];
775 #endif
776
777         setuser();
778
779         timeout.tv_nsec = 0;
780         pthread_mutex_lock(&unpending_mutex);
781
782         for (;;) {
783                 while (!(pending_tail->flags & FLOW_PENDING)) {
784                         gettimeofday(&now, 0);
785                         timeout.tv_sec = now.tv_sec + unpending_timeout;
786                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
787                 }
788
789 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
790                 *logbuf = 0;
791 #endif
792                 if (put_into(pending_tail, COPY_INTO
793 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
794                                 , logbuf
795 #endif
796                         ) < 0) {
797 #if ((DEBUG) & DEBUG_I)
798                         pkts_lost_unpending++;
799 #endif                          
800                 }
801
802 #if ((DEBUG) & DEBUG_U)
803                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
804 #endif
805
806                 pending_tail->flags = 0;
807                 pending_tail = pending_tail->next;
808 #if ((DEBUG) & DEBUG_I)
809                 pkts_pending_done++;
810 #endif
811         }
812 }
813
814 void *scan_thread()
815 {
816 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
817         char logbuf[256];
818 #endif
819         int i;
820         struct Flow *flow, **flowpp;
821         struct Time now;
822         struct timespec timeout;
823
824         setuser();
825
826         timeout.tv_nsec = 0;
827         pthread_mutex_lock(&scan_mutex);
828
829         for (;;) {
830                 gettime(&now);
831                 timeout.tv_sec = now.sec + scan_interval;
832                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
833
834                 gettime(&now);
835 #if ((DEBUG) & DEBUG_S)
836                 my_log(LOG_DEBUG, "S: %d", now.sec);
837 #endif
838                 for (i = 0; i < 1 << HASH_BITS ; i++) {
839                         pthread_mutex_lock(&flows_mutex[i]);
840                         flow = flows[i];
841                         flowpp = &flows[i];
842                         while (flow) {
843                                 if (flow->flags & FLOW_FRAG) {
844                                         /* Process fragmented flow */
845                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
846                                                 /* Fragmented flow expired - put it into special chain */
847 #if ((DEBUG) & DEBUG_I)
848                                                 flows_fragmented--;
849                                                 flows_total--;
850 #endif
851                                                 *flowpp = flow->next;
852                                                 flow->id = 0;
853                                                 flow->flags &= ~FLOW_FRAG;
854                                                 flow->next = scan_frag_dreg;
855                                                 scan_frag_dreg = flow;
856                                                 flow = *flowpp;
857                                                 continue;
858                                         }
859                                 } else {
860                                         /* Flow is not frgamented */
861                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
862                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
863                                                 /* Flow expired */
864 #if ((DEBUG) & DEBUG_S)
865                                                 my_log(LOG_DEBUG, "S: E %x", flow);
866 #endif
867 #if ((DEBUG) & DEBUG_I)
868                                                 flows_total--;
869 #endif
870                                                 *flowpp = flow->next;
871                                                 pthread_mutex_lock(&emit_mutex);
872                                                 flow->next = flows_emit;
873                                                 flows_emit = flow;
874 #if ((DEBUG) & DEBUG_I)
875                                                 emit_queue++;
876 #endif                          
877                                                 pthread_mutex_unlock(&emit_mutex);
878                                                 flow = *flowpp;
879                                                 continue;
880                                         }
881                                 }
882                                 flowpp = &flow->next;
883                                 flow = flow->next;
884                         } /* chain loop */
885                         pthread_mutex_unlock(&flows_mutex[i]);
886                 } /* hash loop */
887                 if (flows_emit) pthread_cond_signal(&emit_cond);
888
889                 while (scan_frag_dreg) {
890                         flow = scan_frag_dreg;
891                         scan_frag_dreg = flow->next;
892 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
893                         *logbuf = 0;
894 #endif
895                         put_into(flow, MOVE_INTO
896 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
897                                 , logbuf
898 #endif
899                         );
900 #if ((DEBUG) & DEBUG_S)
901                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
902 #endif
903                 }
904         }
905 }
906
907 void *cap_thread()
908 {
909         struct ulog_packet_msg *ulog_msg;
910         struct ip *nl;
911         void *tl;
912         struct Flow *flow;
913         int len, off_frag, psize;
914 #if ((DEBUG) & DEBUG_C)
915         char buf[64];
916         char logbuf[256];
917 #endif
918
919         setuser();
920
921         while (!killed) {
922                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
923                 if (len <= 0) {
924                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
925                         continue;
926                 }
927                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
928
929 #if ((DEBUG) & DEBUG_C)
930                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
931 #endif
932
933                         nl = (void *) &ulog_msg->payload;
934                         psize = ulog_msg->data_len;
935
936                         /* Sanity check */
937                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
938 #if ((DEBUG) & DEBUG_C)
939                                 strcat(logbuf, " U");
940                                 my_log(LOG_DEBUG, "%s", logbuf);
941 #endif
942 #if ((DEBUG) & DEBUG_I)
943                                 pkts_ignored++;
944 #endif
945                                 continue;
946                         }
947
948                         if (pending_head->flags) {
949 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
950                                 my_log(LOG_ERR,
951 # if ((DEBUG) & DEBUG_C)
952                                         "%s %s %s", logbuf,
953 # else
954                                         "%s %s",
955 # endif
956                                         "pending queue full:", "packet lost");
957 #endif
958 #if ((DEBUG) & DEBUG_I)
959                                 pkts_lost_capture++;
960 #endif
961                                 goto done;
962                         }
963
964 #if ((DEBUG) & DEBUG_I)
965                         pkts_total++;
966 #endif
967
968                         flow = pending_head;
969
970                         /* ?FIXME? Add sanity check for ip_len? */
971                         flow->size = ntohs(nl->ip_len);
972 #if ((DEBUG) & DEBUG_I)
973                         size_total += flow->size;
974 #endif
975
976                         flow->sip = nl->ip_src;
977                         flow->dip = nl->ip_dst;
978                         flow->iif = snmp_index(ulog_msg->indev_name);
979                         flow->oif = snmp_index(ulog_msg->outdev_name);
980                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
981                         flow->proto = nl->ip_p;
982                         flow->id = 0;
983                         flow->tcp_flags = 0;
984                         flow->pkts = 1;
985                         flow->sizeF = 0;
986                         flow->sizeP = 0;
987                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
988                         if (ulog_msg->timestamp_sec) {
989                                 flow->ctime.sec = ulog_msg->timestamp_sec;
990                                 flow->ctime.usec = ulog_msg->timestamp_usec;
991                         } else gettime(&flow->ctime);
992                         flow->mtime = flow->ctime;
993
994                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
995
996                         /*
997                         Offset (from network layer) to transport layer header/IP data
998                         IOW IP header size ;-)
999
1000                         ?FIXME?
1001                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1002                         */
1003                         off_tl = nl->ip_hl << 2;
1004                         tl = (void *) nl + off_tl;
1005
1006                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1007                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1008                         psize -= off_tl;
1009                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1010                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1011
1012                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1013                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1014 #if ((DEBUG) & DEBUG_C)
1015                                 strcat(logbuf, " F");
1016 #endif
1017 #if ((DEBUG) & DEBUG_I)
1018                                 pkts_total_fragmented++;
1019 #endif
1020                                 flow->flags |= FLOW_FRAG;
1021                                 flow->id = nl->ip_id;
1022
1023                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1024                                         /* Packet whith IP_MF contains information about whole datagram size */
1025                                         flow->flags |= FLOW_LASTFRAG;
1026                                         /* size = frag_offset*8 + data_size */
1027                                         flow->sizeP = off_frag + flow->sizeF;
1028                                 }
1029                         }
1030
1031 #if ((DEBUG) & DEBUG_C)
1032                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1033                         strcat(logbuf, buf);
1034                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1035                         strcat(logbuf, buf);
1036 #endif
1037
1038                         /*
1039                         Fortunately most interesting transport layer information fit
1040                         into first 8 bytes of IP data field (minimal nonzero size).
1041                         Thus we don't need actual packet reassembling to build whole
1042                         transport layer data. We only check the fragment offset for
1043                         zero value to find packet with this information.
1044                         */
1045                         if (!off_frag && psize >= 8) {
1046                                 switch (flow->proto) {
1047                                         case IPPROTO_TCP:
1048                                         case IPPROTO_UDP:
1049                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1050                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1051                                                 goto tl_known;
1052
1053 #ifdef ICMP_TRICK
1054                                         case IPPROTO_ICMP:
1055                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1056                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1057                                                 goto tl_known;
1058 #endif
1059 #ifdef ICMP_TRICK_CISCO
1060                                         case IPPROTO_ICMP:
1061                                                 flow->dp = *((int32_t *) tl);
1062                                                 goto tl_known;
1063 #endif
1064
1065                                         default:
1066                                                 /* Unknown transport layer */
1067 #if ((DEBUG) & DEBUG_C)
1068                                                 strcat(logbuf, " U");
1069 #endif
1070                                                 flow->sp = 0;
1071                                                 flow->dp = 0;
1072                                                 break;
1073
1074                                         tl_known:
1075 #if ((DEBUG) & DEBUG_C)
1076                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1077                                                 strcat(logbuf, buf);
1078 #endif
1079                                                 flow->flags |= FLOW_TL;
1080                                 }
1081                         }
1082
1083                         /* Check for tcp flags presence (including CWR and ECE). */
1084                         if (flow->proto == IPPROTO_TCP
1085                                 && off_frag < 16
1086                                 && psize >= 16 - off_frag) {
1087                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1088 #if ((DEBUG) & DEBUG_C)
1089                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1090                                 strcat(logbuf, buf);
1091 #endif
1092                         }
1093
1094 #if ((DEBUG) & DEBUG_C)
1095                         sprintf(buf, " => %x", (unsigned) flow);
1096                         strcat(logbuf, buf);
1097                         my_log(LOG_DEBUG, "%s", logbuf);
1098 #endif
1099
1100 #if ((DEBUG) & DEBUG_I)
1101                         pkts_pending++;
1102                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1103                         if (pending_queue_trace < pending_queue_trace_candidate)
1104                                 pending_queue_trace = pending_queue_trace_candidate;
1105 #endif
1106
1107                         /* Flow complete - inform unpending_thread() about it */
1108                         pending_head->flags |= FLOW_PENDING;
1109                         pending_head = pending_head->next;
1110                 done:
1111                         pthread_cond_signal(&unpending_cond);
1112                 }
1113         }
1114         return 0;
1115 }
1116
1117 int main(int argc, char **argv)
1118 {
1119         char errpbuf[512];
1120         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1121         int c, i, write_fd, memory_limit = 0;
1122         struct addrinfo hints, *res;
1123         struct sockaddr_in saddr;
1124         pthread_attr_t tattr;
1125         struct sigaction sigact;
1126         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1127         struct timeval timeout;
1128
1129         sched_min = sched_get_priority_min(SCHED);
1130         sched_max = sched_get_priority_max(SCHED);
1131
1132         memset(&saddr, 0 , sizeof(saddr));
1133         memset(&hints, 0 , sizeof(hints));
1134         hints.ai_flags = AI_PASSIVE;
1135         hints.ai_family = AF_INET;
1136         hints.ai_socktype = SOCK_DGRAM;
1137
1138         /* Process command line options */
1139
1140         opterr = 0;
1141         while ((c = my_getopt(argc, argv, parms)) != -1) {
1142                 switch (c) {
1143                         case '?':
1144                                 usage();
1145
1146                         case 'h':
1147                                 usage();
1148                 }
1149         }
1150
1151         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1152         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1153         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1154         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1155         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1156         if (parms[nflag].count) {
1157                 switch (atoi(parms[nflag].arg)) {
1158                         case 1:
1159                                 netflow = &NetFlow1;
1160                                 break;
1161
1162                         case 5:
1163                                 break;
1164
1165                         case 7:
1166                                 netflow = &NetFlow7;
1167                                 break;
1168
1169                         default:
1170                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1171                                 exit(1);
1172                 }
1173         }
1174         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1175         if (parms[lflag].count) {
1176                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1177                         *log_suffix++ = 0;
1178                         if (*log_suffix) {
1179                                 sprintf(errpbuf, "[%s]", log_suffix);
1180                                 strcat(ident, errpbuf);
1181                         }
1182                 }
1183                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1184                 if (log_suffix) *--log_suffix = ':';
1185         }
1186         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1187         err_malloc:
1188                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1189                 exit(1);
1190         }
1191         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1192         if (parms[qflag].count) {
1193                 pending_queue_length = atoi(parms[qflag].arg);
1194                 if (pending_queue_length < 1) {
1195                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1196                         exit(1);
1197                 }
1198         }
1199         if (parms[rflag].count) {
1200                 schedp.sched_priority = atoi(parms[rflag].arg);
1201                 if (schedp.sched_priority
1202                         && (schedp.sched_priority < sched_min
1203                                 || schedp.sched_priority > sched_max)) {
1204                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1205                         exit(1);
1206                 }
1207         }
1208         if (parms[Bflag].count) {
1209                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1210         }
1211         if (parms[bflag].count) {
1212                 bulk_quantity = atoi(parms[bflag].arg);
1213                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1214                         fprintf(stderr, "Illegal %s\n", "bulk size");
1215                         exit(1);
1216                 }
1217         }
1218         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1219         if (parms[Xflag].count) {
1220                 for(i = 0; parms[Xflag].arg[i]; i++)
1221                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1222                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1223                         goto err_malloc;
1224                 rule = strtok(parms[Xflag].arg, ":");
1225                 for (i = 0; rule; i++) {
1226                         snmp_rules[i].len = strlen(rule);
1227                         if (snmp_rules[i].len > IFNAMSIZ) {
1228                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1229                                 exit(1);
1230                         }
1231                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1232                         if (!*(rule - 1)) *(rule - 1) = ',';
1233                         rule = strtok(NULL, ",");
1234                         if (!rule) {
1235                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1236                                 exit(1);
1237                         }
1238                         snmp_rules[i].base = atoi(rule);
1239                         *(rule - 1) = ':';
1240                         rule = strtok(NULL, ":");
1241                 }
1242                 nsnmp_rules = i;
1243         }
1244         if (parms[tflag].count)
1245                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1246         if (parms[aflag].count) {
1247                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1248                 bad_lhost:
1249                         fprintf(stderr, "Illegal %s\n", "source address");
1250                         exit(1);
1251                 } else {
1252                         saddr = *((struct sockaddr_in *) res->ai_addr);
1253                         freeaddrinfo(res);
1254                 }
1255         }
1256         if (parms[uflag].count) 
1257                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1258                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1259                         exit(1);
1260                 }
1261
1262
1263         /* Process collectors parameters. Brrrr... :-[ */
1264
1265         npeers = argc - optind;
1266         if (npeers > 1) {
1267                 /* Send to remote Netflow collector */
1268                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1269                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1270                         dhost = argv[i];
1271                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1272                         *dport++ = 0;
1273                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1274                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1275                                 exit(1);
1276                         }
1277                         peers[npeers].write_fd = write_fd;
1278                         peers[npeers].type = PEER_MIRROR;
1279                         peers[npeers].laddr = saddr;
1280                         peers[npeers].seq = 0;
1281                         if ((lhost = strchr(dport, '/'))) {
1282                                 *lhost++ = 0;
1283                                 if ((type = strchr(lhost, '/'))) {
1284                                         *type++ = 0;
1285                                         switch (*type) {
1286                                                 case 0:
1287                                                 case 'm':
1288                                                         break;
1289
1290                                                 case 'r':
1291                                                         peers[npeers].type = PEER_ROTATE;
1292                                                         npeers_rot++;
1293                                                         break;
1294
1295                                                 default:
1296                                                         goto bad_collector;
1297                                         }
1298                                 }
1299                                 if (*lhost) {
1300                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1301                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1302                                         freeaddrinfo(res);
1303                                 }
1304                         }
1305                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1306                                                 sizeof(struct sockaddr_in))) {
1307                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1308                                 exit(1);
1309                         }
1310                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1311 bad_collector:
1312                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1313                                 exit(1);
1314                         }
1315                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1316                         freeaddrinfo(res);
1317                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1318                                                 sizeof(struct sockaddr_in))) {
1319                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1320                                 exit(1);
1321                         }
1322
1323                         /* Restore command line */
1324                         if (type) *--type = '/';
1325                         if (lhost) *--lhost = '/';
1326                         *--dport = ':';
1327                 }
1328         }
1329         else if (parms[fflag].count) {
1330                 // log into a file
1331                 char *fname;
1332                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1333                 fname = parms[fflag].arg;
1334                 if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
1335                         fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
1336                         exit(1);
1337                 }
1338                 peers[0].write_fd = write_fd;
1339                 peers[0].type = PEER_FILE;
1340                 peers[0].seq = 0;
1341                 npeers++;
1342         }
1343         else 
1344                 usage();
1345
1346
1347         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1348         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1349         if (!ulog_handle) {
1350                 fprintf(stderr, "libipulog initialization error: %s",
1351                         ipulog_strerror(ipulog_errno));
1352                 exit(1);
1353         }
1354         if (sockbufsize)
1355                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1356                         &sockbufsize, sizeof(sockbufsize)) < 0)
1357                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1358
1359         /* Daemonize (if log destination stdout-free) */
1360
1361         my_log_open(ident, verbosity, log_dest);
1362         if (!(log_dest & 2)) {
1363                 switch (fork()) {
1364                         case -1:
1365                                 fprintf(stderr, "fork(): %s", strerror(errno));
1366                                 exit(1);
1367
1368                         case 0:
1369                                 setsid();
1370                                 freopen("/dev/null", "r", stdin);
1371                                 freopen("/dev/null", "w", stdout);
1372                                 freopen("/dev/null", "w", stderr);
1373                                 break;
1374
1375                         default:
1376                                 exit(0);
1377                 }
1378         } else {
1379                 setvbuf(stdout, (char *)0, _IONBF, 0);
1380                 setvbuf(stderr, (char *)0, _IONBF, 0);
1381         }
1382
1383         pid = getpid();
1384         sprintf(errpbuf, "[%ld]", (long) pid);
1385         strcat(ident, errpbuf);
1386
1387         /* Initialization */
1388
1389         hash_init(); /* Actually for crc16 only */
1390         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1391         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1392
1393 #ifdef UPTIME_TRICK
1394         /* Hope 12 days is enough :-/ */
1395         start_time_offset = 1 << 20;
1396
1397         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1398 #endif
1399         gettime(&start_time);
1400
1401         /*
1402         Build static pending queue as circular buffer.
1403         */
1404         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1405         pending_tail = pending_head;
1406         for (i = pending_queue_length - 1; i--;) {
1407                 if (!(pending_tail->next = mem_alloc())) {
1408                 err_mem_alloc:
1409                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1410                         exit(1);
1411                 }
1412                 pending_tail = pending_tail->next;
1413         }
1414         pending_tail->next = pending_head;
1415         pending_tail = pending_head;
1416
1417         sigemptyset(&sig_mask);
1418         sigact.sa_handler = &sighandler;
1419         sigact.sa_mask = sig_mask;
1420         sigact.sa_flags = 0;
1421         sigaddset(&sig_mask, SIGTERM);
1422         sigaction(SIGTERM, &sigact, 0);
1423 #if ((DEBUG) & DEBUG_I)
1424         sigaddset(&sig_mask, SIGUSR1);
1425         sigaction(SIGUSR1, &sigact, 0);
1426 #endif
1427         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1428                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1429                 exit(1);
1430         }
1431
1432         my_log(LOG_INFO, "Starting %s...", VERSION);
1433
1434         if (parms[cflag].count) {
1435                 if (chdir(parms[cflag].arg) || chroot(".")) {
1436                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1437                         exit(1);
1438                 }
1439         }
1440
1441         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1442         pthread_attr_init(&tattr);
1443         for (i = 0; i < THREADS - 1; i++) {
1444                 if (schedp.sched_priority > 0) {
1445                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1446                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1447                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1448                                 exit(1);
1449                         }
1450                 }
1451                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1452                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1453                         exit(1);
1454                 }
1455                 pthread_detach(thid);
1456                 schedp.sched_priority++;
1457         }
1458
1459         if (pw) {
1460                 if (setgroups(0, NULL)) {
1461                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1462                         exit(1);
1463                 }
1464                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1465                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1466                         exit(1);
1467                 }
1468                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1469                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1470                         exit(1);
1471                 }
1472         }
1473
1474         if (!(pidfile = fopen(pidfilepath, "w")))
1475                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1476         else {
1477                 fprintf(pidfile, "%ld\n", (long) pid);
1478                 fclose(pidfile);
1479         }
1480
1481         my_log(LOG_INFO, "pid: %d", pid);
1482         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1483                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1484                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1485                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1486                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1487                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1488                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1489         for (i = 0; i < nsnmp_rules; i++) {
1490                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1491                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1492         }
1493         for (i = 0; i < npeers; i++) {
1494                 switch (peers[i].type) {
1495                         case PEER_MIRROR:
1496                                 c = 'm';
1497                                 break;
1498                         case PEER_ROTATE:
1499                                 c = 'r';
1500                                 break;
1501                 }
1502                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1503                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1504                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1505         }
1506
1507         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1508
1509         timeout.tv_usec = 0;
1510         while (!killed
1511                 || (total_elements - free_elements - pending_queue_length)
1512                 || emit_count
1513                 || pending_tail->flags) {
1514
1515                 if (!sigs) {
1516                         timeout.tv_sec = scan_interval;
1517                         select(0, 0, 0, 0, &timeout);
1518                 }
1519
1520                 if (sigs & SIGTERM_MASK && !killed) {
1521                         sigs &= ~SIGTERM_MASK;
1522                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1523                         scan_interval = 1;
1524                         frag_lifetime = -1;
1525                         active_lifetime = -1;
1526                         inactive_lifetime = -1;
1527                         emit_timeout = 1;
1528                         unpending_timeout = 1;
1529                         killed = 1;
1530                         pthread_cond_signal(&scan_cond);
1531                         pthread_cond_signal(&unpending_cond);
1532                 }
1533
1534 #if ((DEBUG) & DEBUG_I)
1535                 if (sigs & SIGUSR1_MASK) {
1536                         sigs &= ~SIGUSR1_MASK;
1537                         info_debug();
1538                 }
1539 #endif
1540         }
1541         remove(pidfilepath);
1542 #if ((DEBUG) & DEBUG_I)
1543         info_debug();
1544 #endif
1545         my_log(LOG_INFO, "Done.");
1546 #ifdef WALL
1547         return 0;
1548 #endif
1549 }