Split up log file into epochs so that we can throw away old data.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8 */
9
10 #include <common.h>
11
12 /* stdout, stderr, freopen() */
13 #include <stdio.h>
14
15 /* atoi(), exit() */
16 #include <stdlib.h>
17
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
19 #include <unistd.h>
20
21 /* strerror() */
22 #include <string.h>
23
24 /* sig*() */
25 #include <signal.h>
26
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
29         int fd;
30         u_int8_t blocking;
31         struct sockaddr_nl local;
32         struct sockaddr_nl peer;
33         struct nlmsghdr* last_nlhdr;
34 };
35
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
46 #include <net/if.h>
47
48 #include <sys/param.h>
49 #include <pwd.h>
50 #ifdef OS_LINUX
51 #include <grp.h>
52 #endif
53
54 /* pthread_*() */
55 #include <pthread.h>
56
57 /* errno */
58 #include <errno.h>
59
60 /* getaddrinfo() */
61 #include <netdb.h>
62
63 /* nanosleep() */
64 #include <time.h>
65
66 /* gettimeofday() */
67 #include <sys/time.h>
68
69 /* scheduling */
70 #include <sched.h>
71
72 /* select() (POSIX)*/
73 #include <sys/select.h>
74
75 /* open() */
76 #include <sys/stat.h>
77 #include <fcntl.h>
78
79 #include <fprobe-ulog.h>
80 #include <my_log.h>
81 #include <my_getopt.h>
82 #include <netflow.h>
83 #include <hash.h>
84 #include <mem.h>
85
86 enum {
87         aflag,
88         Bflag,
89         bflag,
90         cflag,
91         dflag,
92         eflag,
93         Eflag,
94         fflag,
95         gflag,
96         hflag,
97         lflag,
98         mflag,
99         Mflag,
100         nflag,
101         qflag,
102         rflag,
103         sflag,
104         tflag,
105         Tflag,
106         Uflag,
107         uflag,
108         vflag,
109         Xflag,
110 };
111
112 static struct getopt_parms parms[] = {
113         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
114         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
115         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
116         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
117         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
118         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
119         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
120         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
121         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
122         {'h', 0, 0, 0},
123         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
124         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'M', 0, 0, 0},
126         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {0, 0, 0, 0}
137 };
138
139 extern char *optarg;
140 extern int optind, opterr, optopt;
141 extern int errno;
142
143 extern struct NetFlow NetFlow1;
144 extern struct NetFlow NetFlow5;
145 extern struct NetFlow NetFlow7;
146
147 #define mark_is_tos parms[Mflag].count
148 static unsigned scan_interval = 5;
149 static int frag_lifetime = 30;
150 static int inactive_lifetime = 60;
151 static int active_lifetime = 300;
152 static int sockbufsize;
153 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
154 #if (MEM_BITS == 0) || (MEM_BITS == 16)
155 #define BULK_QUANTITY 10000
156 #else
157 #define BULK_QUANTITY 200
158 #endif
159
160 static unsigned epoch_length=60, log_epochs=1; 
161 static unsigned cur_epoch=0,prev_uptime=0;
162
163 static unsigned bulk_quantity = BULK_QUANTITY;
164 static unsigned pending_queue_length = 100;
165 static struct NetFlow *netflow = &NetFlow5;
166 static unsigned verbosity = 6;
167 static unsigned log_dest = MY_LOG_SYSLOG;
168 static struct Time start_time;
169 static long start_time_offset;
170 static int off_tl;
171 /* From mem.c */
172 extern unsigned total_elements;
173 extern unsigned free_elements;
174 extern unsigned total_memory;
175 #if ((DEBUG) & DEBUG_I)
176 static unsigned emit_pkts, emit_queue;
177 static uint64_t size_total;
178 static unsigned pkts_total, pkts_total_fragmented;
179 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
180 static unsigned pkts_pending, pkts_pending_done;
181 static unsigned pending_queue_trace, pending_queue_trace_candidate;
182 static unsigned flows_total, flows_fragmented;
183 #endif
184 static unsigned emit_count;
185 static uint32_t emit_sequence;
186 static unsigned emit_rate_bytes, emit_rate_delay;
187 static struct Time emit_time;
188 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
189 static pthread_t thid;
190 static sigset_t sig_mask;
191 static struct sched_param schedp;
192 static int sched_min, sched_max;
193 static int npeers, npeers_rot;
194 static struct peer *peers;
195 static int sigs;
196
197 static struct Flow *flows[1 << HASH_BITS];
198 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
199
200 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
202
203 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
204 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
205 static struct Flow *pending_head, *pending_tail;
206 static struct Flow *scan_frag_dreg;
207
208 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
209 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
210 static struct Flow *flows_emit;
211
212 static char ident[256] = "fprobe-ulog";
213 static FILE *pidfile;
214 static char *pidfilepath;
215 static pid_t pid;
216 static int killed;
217 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
218 static struct ipulog_handle *ulog_handle;
219 static uint32_t ulog_gmask = 1;
220 static char *cap_buf;
221 static int nsnmp_rules;
222 static struct snmp_rule *snmp_rules;
223 static struct passwd *pw = 0;
224
225 void usage()
226 {
227         fprintf(stdout,
228                 "fprobe-ulog: a NetFlow probe. Version %s\n"
229                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
230                 "\n"
231                 "-h\t\tDisplay this help\n"
232                 "-U <mask>\tULOG group bitwise mask [1]\n"
233                 "-s <seconds>\tHow often scan for expired flows [5]\n"
234                 "-g <seconds>\tFragmented flow lifetime [30]\n"
235                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
236                 "-f <filename>\tLog flow data in a file\n"
237                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
238                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
239                 "-a <address>\tUse <address> as source for NetFlow flow\n"
240                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
241                 "-M\t\tUse netfilter mark value as ToS flag\n"
242                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
243                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
244                 "-q <flows>\tPending queue length [100]\n"
245                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
246                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
247                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
248                 "-c <directory>\tDirectory to chroot to\n"
249                 "-u <user>\tUser to run as\n"
250                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
251                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
252                 "-y <remote:port>\tAddress of the NetFlow collector\n",
253                 "-f <writable file>\tFile to write data into\n"
254                 "-T <n>\tRotate log file every n epochs\n"
255                 "-E <[1..60]>\tSize of an epoch in minutes\n"
256                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
257         exit(0);
258 }
259
260 #if ((DEBUG) & DEBUG_I)
261 void info_debug()
262 {
263         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
264                 pkts_total, pkts_total_fragmented, size_total,
265                 pkts_pending - pkts_pending_done, pending_queue_trace);
266         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
267                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
268         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
269                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
270         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
271                 total_elements, free_elements, total_memory);
272 }
273 #endif
274
275 void sighandler(int sig)
276 {
277         switch (sig) {
278                 case SIGTERM:
279                         sigs |= SIGTERM_MASK;
280                         break;
281 #if ((DEBUG) & DEBUG_I)
282                 case SIGUSR1:
283                         sigs |= SIGUSR1_MASK;
284                         break;
285 #endif
286         }
287 }
288
289 void gettime(struct Time *now)
290 {
291         struct timeval t;
292
293         gettimeofday(&t, 0);
294         now->sec = t.tv_sec;
295         now->usec = t.tv_usec;
296 }
297
298 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
299 {
300         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
301 }
302
303 /* Uptime in miliseconds */
304 uint32_t getuptime(struct Time *t)
305 {
306         /* Maximum uptime is about 49/2 days */
307         return cmpmtime(t, &start_time);
308 }
309
310 hash_t hash_flow(struct Flow *flow)
311 {
312         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
313         else return hash(flow, sizeof(struct Flow_TL));
314 }
315
316 uint16_t snmp_index(char *name) {
317         uint32_t i;
318
319         if (!*name) return 0;
320
321         for (i = 0; (int) i < nsnmp_rules; i++) {
322                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
323                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
324         }
325
326         if ((i = if_nametoindex(name))) return i;
327
328         return -1;
329 }
330
331 inline void copy_flow(struct Flow *src, struct Flow *dst)
332 {
333         dst->iif = src->iif;
334         dst->oif = src->oif;
335         dst->sip = src->sip;
336         dst->dip = src->dip;
337         dst->tos = src->tos;
338         dst->proto = src->proto;
339         dst->tcp_flags = src->tcp_flags;
340         dst->id = src->id;
341         dst->sp = src->sp;
342         dst->dp = src->dp;
343         dst->pkts = src->pkts;
344         dst->size = src->size;
345         dst->sizeF = src->sizeF;
346         dst->sizeP = src->sizeP;
347         dst->ctime = src->ctime;
348         dst->mtime = src->mtime;
349         dst->flags = src->flags;
350 }
351
352 unsigned get_log_fd(char *fname, unsigned cur_fd) {
353         struct Time now;
354         unsigned cur_uptime;
355         int ret_fd;
356         gettime(&now);
357         cur_uptime = getuptime(&now);
358         if ((cur_uptime - prev_uptime) > (1000 * epoch_length)) {
359                 char nextname[MAX_PATH_LEN];
360                 int write_fd;
361                 prev_uptime = cur_uptime;
362                 cur_epoch = (cur_epoch + 1) % log_epochs;
363                 close(cur_fd);
364                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
365                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
366                         fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
367                         exit(1);
368                 }
369                 ret_fd = write_fd;
370         }
371         else
372                 ret_fd = cur_fd;
373         return(cur_fd);
374 }
375
376 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
377 {
378         struct Flow **flowpp;
379
380 #ifdef WALL
381         flowpp = 0;
382 #endif
383
384         if (prev) flowpp = *prev;
385
386         while (where) {
387                 if (where->sip.s_addr == what->sip.s_addr
388                         && where->dip.s_addr == what->dip.s_addr
389                         && where->proto == what->proto) {
390                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
391                                 case 0:
392                                         /* Both unfragmented */
393                                         if ((what->sp == where->sp)
394                                                 && (what->dp == where->dp)) goto done;
395                                         break;
396                                 case 2:
397                                         /* Both fragmented */
398                                         if (where->id == what->id) goto done;
399                                         break;
400                         }
401                 }
402                 flowpp = &where->next;
403                 where = where->next;
404         }
405 done:
406         if (prev) *prev = flowpp;
407         return where;
408 }
409
410 int put_into(struct Flow *flow, int flag
411 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
412         , char *logbuf
413 #endif
414 )
415 {
416         int ret = 0;
417         hash_t h;
418         struct Flow *flown, **flowpp;
419 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
420         char buf[64];
421 #endif
422
423         h = hash_flow(flow);
424 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
425         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
426         strcat(logbuf, buf);
427 #endif
428         pthread_mutex_lock(&flows_mutex[h]);
429         flowpp = &flows[h];
430         if (!(flown = find(flows[h], flow, &flowpp))) {
431                 /* No suitable flow found - add */
432                 if (flag == COPY_INTO) {
433                         if ((flown = mem_alloc())) {
434                                 copy_flow(flow, flown);
435                                 flow = flown;
436                         } else {
437 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
438                                 my_log(LOG_ERR, "%s %s. %s",
439                                         "mem_alloc():", strerror(errno), "packet lost");
440 #endif
441                                 return -1;
442                         }
443                 }
444                 flow->next = flows[h];
445                 flows[h] = flow;
446 #if ((DEBUG) & DEBUG_I)
447                 flows_total++;
448                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
449 #endif
450 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
451                 if (flown) {
452                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
453                         strcat(logbuf, buf);
454                 }
455 #endif
456         } else {
457                 /* Found suitable flow - update */
458 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
459                 sprintf(buf, " +> %x", (unsigned) flown);
460                 strcat(logbuf, buf);
461 #endif
462                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
463                         flown->mtime = flow->mtime;
464                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
465                         flown->ctime = flow->ctime;
466                 flown->tcp_flags |= flow->tcp_flags;
467                 flown->size += flow->size;
468                 flown->pkts += flow->pkts;
469                 if (flow->flags & FLOW_FRAG) {
470                         /* Fragmented flow require some additional work */
471                         if (flow->flags & FLOW_TL) {
472                                 /*
473                                 ?FIXME?
474                                 Several packets with FLOW_TL (attack)
475                                 */
476                                 flown->sp = flow->sp;
477                                 flown->dp = flow->dp;
478                         }
479                         if (flow->flags & FLOW_LASTFRAG) {
480                                 /*
481                                 ?FIXME?
482                                 Several packets with FLOW_LASTFRAG (attack)
483                                 */
484                                 flown->sizeP = flow->sizeP;
485                         }
486                         flown->flags |= flow->flags;
487                         flown->sizeF += flow->sizeF;
488                         if ((flown->flags & FLOW_LASTFRAG)
489                                 && (flown->sizeF >= flown->sizeP)) {
490                                 /* All fragments received - flow reassembled */
491                                 *flowpp = flown->next;
492                                 pthread_mutex_unlock(&flows_mutex[h]);
493 #if ((DEBUG) & DEBUG_I)
494                                 flows_total--;
495                                 flows_fragmented--;
496 #endif
497                                 flown->id = 0;
498                                 flown->flags &= ~FLOW_FRAG;
499 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
500                                 strcat(logbuf," R");
501 #endif
502                                 ret = put_into(flown, MOVE_INTO
503 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
504                                                 , logbuf
505 #endif
506                                         );
507                         }
508                 }
509                 if (flag == MOVE_INTO) mem_free(flow);
510         }
511         pthread_mutex_unlock(&flows_mutex[h]);
512         return ret;
513 }
514
515 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
516 {
517         int i;
518
519         for (i = 0; i < fields; i++) {
520 #if ((DEBUG) & DEBUG_F)
521                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
522 #endif
523                 switch (format[i]) {
524                         case NETFLOW_IPV4_SRC_ADDR:
525                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
526                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
527                                 break;
528
529                         case NETFLOW_IPV4_DST_ADDR:
530                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
531                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
532                                 break;
533
534                         case NETFLOW_INPUT_SNMP:
535                                 *((uint16_t *) p) = htons(flow->iif);
536                                 p += NETFLOW_INPUT_SNMP_SIZE;
537                                 break;
538
539                         case NETFLOW_OUTPUT_SNMP:
540                                 *((uint16_t *) p) = htons(flow->oif);
541                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
542                                 break;
543
544                         case NETFLOW_PKTS_32:
545                                 *((uint32_t *) p) = htonl(flow->pkts);
546                                 p += NETFLOW_PKTS_32_SIZE;
547                                 break;
548
549                         case NETFLOW_BYTES_32:
550                                 *((uint32_t *) p) = htonl(flow->size);
551                                 p += NETFLOW_BYTES_32_SIZE;
552                                 break;
553
554                         case NETFLOW_FIRST_SWITCHED:
555                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
556                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
557                                 break;
558
559                         case NETFLOW_LAST_SWITCHED:
560                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
561                                 p += NETFLOW_LAST_SWITCHED_SIZE;
562                                 break;
563
564                         case NETFLOW_L4_SRC_PORT:
565                                 *((uint16_t *) p) = flow->sp;
566                                 p += NETFLOW_L4_SRC_PORT_SIZE;
567                                 break;
568
569                         case NETFLOW_L4_DST_PORT:
570                                 *((uint16_t *) p) = flow->dp;
571                                 p += NETFLOW_L4_DST_PORT_SIZE;
572                                 break;
573
574                         case NETFLOW_PROT:
575                                 *((uint8_t *) p) = flow->proto;
576                                 p += NETFLOW_PROT_SIZE;
577                                 break;
578
579                         case NETFLOW_SRC_TOS:
580                                 *((uint8_t *) p) = flow->tos;
581                                 p += NETFLOW_SRC_TOS_SIZE;
582                                 break;
583
584                         case NETFLOW_TCP_FLAGS:
585                                 *((uint8_t *) p) = flow->tcp_flags;
586                                 p += NETFLOW_TCP_FLAGS_SIZE;
587                                 break;
588
589                         case NETFLOW_VERSION:
590                                 *((uint16_t *) p) = htons(netflow->Version);
591                                 p += NETFLOW_VERSION_SIZE;
592                                 break;
593
594                         case NETFLOW_COUNT:
595                                 *((uint16_t *) p) = htons(emit_count);
596                                 p += NETFLOW_COUNT_SIZE;
597                                 break;
598
599                         case NETFLOW_UPTIME:
600                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
601                                 p += NETFLOW_UPTIME_SIZE;
602                                 break;
603
604                         case NETFLOW_UNIX_SECS:
605                                 *((uint32_t *) p) = htonl(emit_time.sec);
606                                 p += NETFLOW_UNIX_SECS_SIZE;
607                                 break;
608
609                         case NETFLOW_UNIX_NSECS:
610                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
611                                 p += NETFLOW_UNIX_NSECS_SIZE;
612                                 break;
613
614                         case NETFLOW_FLOW_SEQUENCE:
615                                 //*((uint32_t *) p) = htonl(emit_sequence);
616                                 *((uint32_t *) p) = 0;
617                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
618                                 break;
619
620                         case NETFLOW_PAD8:
621                         /* Unsupported (uint8_t) */
622                         case NETFLOW_ENGINE_TYPE:
623                         case NETFLOW_ENGINE_ID:
624                         case NETFLOW_FLAGS7_1:
625                         case NETFLOW_SRC_MASK:
626                         case NETFLOW_DST_MASK:
627                                 *((uint8_t *) p) = 0;
628                                 p += NETFLOW_PAD8_SIZE;
629                                 break;
630
631                         case NETFLOW_PAD16:
632                         /* Unsupported (uint16_t) */
633                         case NETFLOW_SRC_AS:
634                         case NETFLOW_DST_AS:
635                         case NETFLOW_FLAGS7_2:
636                                 *((uint16_t *) p) = 0;
637                                 p += NETFLOW_PAD16_SIZE;
638                                 break;
639
640                         case NETFLOW_PAD32:
641                         /* Unsupported (uint32_t) */
642                         case NETFLOW_IPV4_NEXT_HOP:
643                         case NETFLOW_ROUTER_SC:
644                                 *((uint32_t *) p) = 0;
645                                 p += NETFLOW_PAD32_SIZE;
646                                 break;
647
648                         default:
649                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
650                                         format, i, format[i]);
651                                 exit(1);
652                 }
653         }
654 #if ((DEBUG) & DEBUG_F)
655         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
656 #endif
657         return p;
658 }
659
660 void setuser() {
661         /*
662         Workaround for clone()-based threads
663         Try to change EUID independently of main thread
664         */
665         if (pw) {
666                 setgroups(0, NULL);
667                 setregid(pw->pw_gid, pw->pw_gid);
668                 setreuid(pw->pw_uid, pw->pw_uid);
669         }
670 }
671
672 void *emit_thread()
673 {
674         struct Flow *flow;
675         void *p;
676         struct timeval now;
677         struct timespec timeout;
678         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
679
680         p = (void *) &emit_packet + netflow->HeaderSize;
681         timeout.tv_nsec = 0;
682
683         setuser();
684
685         for (;;) {
686                 pthread_mutex_lock(&emit_mutex);
687                 while (!flows_emit) {
688                         gettimeofday(&now, 0);
689                         timeout.tv_sec = now.tv_sec + emit_timeout;
690                         /* Do not wait until emit_packet will filled - it may be too long */
691                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
692                                 pthread_mutex_unlock(&emit_mutex);
693                                 goto sendit;
694                         }
695                 }
696                 flow = flows_emit;
697                 flows_emit = flows_emit->next;
698 #if ((DEBUG) & DEBUG_I)
699                 emit_queue--;
700 #endif          
701                 pthread_mutex_unlock(&emit_mutex);
702
703 #ifdef UPTIME_TRICK
704                 if (!emit_count) {
705                         gettime(&start_time);
706                         start_time.sec -= start_time_offset;
707                 }
708 #endif
709                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
710                 mem_free(flow);
711                 emit_count++;
712 #ifdef PF2_DEBUG
713                 printf("Emit count = %d\n", emit_count);
714                 fflush(stdout);
715 #endif
716                 if (emit_count == netflow->MaxFlows) {
717                 sendit:
718                         gettime(&emit_time);
719                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
720                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
721                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
722                         if (size < 1464) size = 1464;
723                         peer_rot_cur = 0;
724                         for (i = 0; i < npeers; i++) {
725                                 if (peers[0].type == PEER_FILE) {
726                                                 if (netflow->SeqOffset)
727                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
728 #define MESSAGES
729                                                 peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
730                                                 ret = write(peers[0].write_fd, emit_packet, size);
731                                                 if (ret < size) {
732 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
733                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
734                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
735 #endif
736 #undef MESSAGES
737                                                 }
738 #if ((DEBUG) & DEBUG_E)
739                                                 commaneelse {
740                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
741                                                                 emit_count, i + 1, peers[i].seq);
742                                                 }
743 #endif
744                                                 peers[0].seq += emit_count;
745
746                                                 /* Rate limit */
747                                                 if (emit_rate_bytes) {
748                                                         sent += size;
749                                                         delay = sent / emit_rate_bytes;
750                                                         if (delay) {
751                                                                 sent %= emit_rate_bytes;
752                                                                 timeout.tv_sec = 0;
753                                                                 timeout.tv_nsec = emit_rate_delay * delay;
754                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
755                                                         }
756                                                 }
757                                         }
758                                 else
759                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
760                                 else
761                                 if (peers[i].type == PEER_ROTATE) 
762                                         if (peer_rot_cur++ == peer_rot_work) {
763                                         sendreal:
764                                                 if (netflow->SeqOffset)
765                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
766                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
767                                                 if (ret < size) {
768 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
769                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
770                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
771 #endif
772                                                 }
773 #if ((DEBUG) & DEBUG_E)
774                                                 commaneelse {
775                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
776                                                                 emit_count, i + 1, peers[i].seq);
777                                                 }
778 #endif
779                                                 peers[i].seq += emit_count;
780
781                                                 /* Rate limit */
782                                                 if (emit_rate_bytes) {
783                                                         sent += size;
784                                                         delay = sent / emit_rate_bytes;
785                                                         if (delay) {
786                                                                 sent %= emit_rate_bytes;
787                                                                 timeout.tv_sec = 0;
788                                                                 timeout.tv_nsec = emit_rate_delay * delay;
789                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
790                                                         }
791                                                 }
792                                         }
793                         }
794                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
795                         emit_sequence += emit_count;
796                         emit_count = 0;
797 #if ((DEBUG) & DEBUG_I)
798                         emit_pkts++;
799 #endif
800                 }
801         }
802 }       
803
804 void *unpending_thread()
805 {
806         struct timeval now;
807         struct timespec timeout;
808 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
809         char logbuf[256];
810 #endif
811
812         setuser();
813
814         timeout.tv_nsec = 0;
815         pthread_mutex_lock(&unpending_mutex);
816
817         for (;;) {
818                 while (!(pending_tail->flags & FLOW_PENDING)) {
819                         gettimeofday(&now, 0);
820                         timeout.tv_sec = now.tv_sec + unpending_timeout;
821                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
822                 }
823
824 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
825                 *logbuf = 0;
826 #endif
827                 if (put_into(pending_tail, COPY_INTO
828 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
829                                 , logbuf
830 #endif
831                         ) < 0) {
832 #if ((DEBUG) & DEBUG_I)
833                         pkts_lost_unpending++;
834 #endif                          
835                 }
836
837 #if ((DEBUG) & DEBUG_U)
838                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
839 #endif
840
841                 pending_tail->flags = 0;
842                 pending_tail = pending_tail->next;
843 #if ((DEBUG) & DEBUG_I)
844                 pkts_pending_done++;
845 #endif
846         }
847 }
848
849 void *scan_thread()
850 {
851 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
852         char logbuf[256];
853 #endif
854         int i;
855         struct Flow *flow, **flowpp;
856         struct Time now;
857         struct timespec timeout;
858
859         setuser();
860
861         timeout.tv_nsec = 0;
862         pthread_mutex_lock(&scan_mutex);
863
864         for (;;) {
865                 gettime(&now);
866                 timeout.tv_sec = now.sec + scan_interval;
867                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
868
869                 gettime(&now);
870 #if ((DEBUG) & DEBUG_S)
871                 my_log(LOG_DEBUG, "S: %d", now.sec);
872 #endif
873                 for (i = 0; i < 1 << HASH_BITS ; i++) {
874                         pthread_mutex_lock(&flows_mutex[i]);
875                         flow = flows[i];
876                         flowpp = &flows[i];
877                         while (flow) {
878                                 if (flow->flags & FLOW_FRAG) {
879                                         /* Process fragmented flow */
880                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
881                                                 /* Fragmented flow expired - put it into special chain */
882 #if ((DEBUG) & DEBUG_I)
883                                                 flows_fragmented--;
884                                                 flows_total--;
885 #endif
886                                                 *flowpp = flow->next;
887                                                 flow->id = 0;
888                                                 flow->flags &= ~FLOW_FRAG;
889                                                 flow->next = scan_frag_dreg;
890                                                 scan_frag_dreg = flow;
891                                                 flow = *flowpp;
892                                                 continue;
893                                         }
894                                 } else {
895                                         /* Flow is not frgamented */
896                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
897                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
898                                                 /* Flow expired */
899 #if ((DEBUG) & DEBUG_S)
900                                                 my_log(LOG_DEBUG, "S: E %x", flow);
901 #endif
902 #if ((DEBUG) & DEBUG_I)
903                                                 flows_total--;
904 #endif
905                                                 *flowpp = flow->next;
906                                                 pthread_mutex_lock(&emit_mutex);
907                                                 flow->next = flows_emit;
908                                                 flows_emit = flow;
909 #if ((DEBUG) & DEBUG_I)
910                                                 emit_queue++;
911 #endif                          
912                                                 pthread_mutex_unlock(&emit_mutex);
913                                                 flow = *flowpp;
914                                                 continue;
915                                         }
916                                 }
917                                 flowpp = &flow->next;
918                                 flow = flow->next;
919                         } /* chain loop */
920                         pthread_mutex_unlock(&flows_mutex[i]);
921                 } /* hash loop */
922                 if (flows_emit) pthread_cond_signal(&emit_cond);
923
924                 while (scan_frag_dreg) {
925                         flow = scan_frag_dreg;
926                         scan_frag_dreg = flow->next;
927 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
928                         *logbuf = 0;
929 #endif
930                         put_into(flow, MOVE_INTO
931 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
932                                 , logbuf
933 #endif
934                         );
935 #if ((DEBUG) & DEBUG_S)
936                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
937 #endif
938                 }
939         }
940 }
941
942 void *cap_thread()
943 {
944         struct ulog_packet_msg *ulog_msg;
945         struct ip *nl;
946         void *tl;
947         struct Flow *flow;
948         int len, off_frag, psize;
949 #if ((DEBUG) & DEBUG_C)
950         char buf[64];
951         char logbuf[256];
952 #endif
953
954         setuser();
955
956         while (!killed) {
957                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
958                 if (len <= 0) {
959                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
960                         continue;
961                 }
962                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
963
964 #if ((DEBUG) & DEBUG_C)
965                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
966 #endif
967
968                         nl = (void *) &ulog_msg->payload;
969                         psize = ulog_msg->data_len;
970
971                         /* Sanity check */
972                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
973 #if ((DEBUG) & DEBUG_C)
974                                 strcat(logbuf, " U");
975                                 my_log(LOG_DEBUG, "%s", logbuf);
976 #endif
977 #if ((DEBUG) & DEBUG_I)
978                                 pkts_ignored++;
979 #endif
980                                 continue;
981                         }
982
983                         if (pending_head->flags) {
984 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
985                                 my_log(LOG_ERR,
986 # if ((DEBUG) & DEBUG_C)
987                                         "%s %s %s", logbuf,
988 # else
989                                         "%s %s",
990 # endif
991                                         "pending queue full:", "packet lost");
992 #endif
993 #if ((DEBUG) & DEBUG_I)
994                                 pkts_lost_capture++;
995 #endif
996                                 goto done;
997                         }
998
999 #if ((DEBUG) & DEBUG_I)
1000                         pkts_total++;
1001 #endif
1002
1003                         flow = pending_head;
1004
1005                         /* ?FIXME? Add sanity check for ip_len? */
1006                         flow->size = ntohs(nl->ip_len);
1007 #if ((DEBUG) & DEBUG_I)
1008                         size_total += flow->size;
1009 #endif
1010
1011                         flow->sip = nl->ip_src;
1012                         flow->dip = nl->ip_dst;
1013                         flow->iif = snmp_index(ulog_msg->indev_name);
1014                         flow->oif = snmp_index(ulog_msg->outdev_name);
1015                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1016                         flow->proto = nl->ip_p;
1017                         flow->id = 0;
1018                         flow->tcp_flags = 0;
1019                         flow->pkts = 1;
1020                         flow->sizeF = 0;
1021                         flow->sizeP = 0;
1022                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1023                         if (ulog_msg->timestamp_sec) {
1024                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1025                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1026                         } else gettime(&flow->ctime);
1027                         flow->mtime = flow->ctime;
1028
1029                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1030
1031                         /*
1032                         Offset (from network layer) to transport layer header/IP data
1033                         IOW IP header size ;-)
1034
1035                         ?FIXME?
1036                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1037                         */
1038                         off_tl = nl->ip_hl << 2;
1039                         tl = (void *) nl + off_tl;
1040
1041                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1042                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1043                         psize -= off_tl;
1044                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1045                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1046
1047                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1048                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1049 #if ((DEBUG) & DEBUG_C)
1050                                 strcat(logbuf, " F");
1051 #endif
1052 #if ((DEBUG) & DEBUG_I)
1053                                 pkts_total_fragmented++;
1054 #endif
1055                                 flow->flags |= FLOW_FRAG;
1056                                 flow->id = nl->ip_id;
1057
1058                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1059                                         /* Packet whith IP_MF contains information about whole datagram size */
1060                                         flow->flags |= FLOW_LASTFRAG;
1061                                         /* size = frag_offset*8 + data_size */
1062                                         flow->sizeP = off_frag + flow->sizeF;
1063                                 }
1064                         }
1065
1066 #if ((DEBUG) & DEBUG_C)
1067                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1068                         strcat(logbuf, buf);
1069                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1070                         strcat(logbuf, buf);
1071 #endif
1072
1073                         /*
1074                         Fortunately most interesting transport layer information fit
1075                         into first 8 bytes of IP data field (minimal nonzero size).
1076                         Thus we don't need actual packet reassembling to build whole
1077                         transport layer data. We only check the fragment offset for
1078                         zero value to find packet with this information.
1079                         */
1080                         if (!off_frag && psize >= 8) {
1081                                 switch (flow->proto) {
1082                                         case IPPROTO_TCP:
1083                                         case IPPROTO_UDP:
1084                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1085                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1086                                                 goto tl_known;
1087
1088 #ifdef ICMP_TRICK
1089                                         case IPPROTO_ICMP:
1090                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1091                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1092                                                 goto tl_known;
1093 #endif
1094 #ifdef ICMP_TRICK_CISCO
1095                                         case IPPROTO_ICMP:
1096                                                 flow->dp = *((int32_t *) tl);
1097                                                 goto tl_known;
1098 #endif
1099
1100                                         default:
1101                                                 /* Unknown transport layer */
1102 #if ((DEBUG) & DEBUG_C)
1103                                                 strcat(logbuf, " U");
1104 #endif
1105                                                 flow->sp = 0;
1106                                                 flow->dp = 0;
1107                                                 break;
1108
1109                                         tl_known:
1110 #if ((DEBUG) & DEBUG_C)
1111                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1112                                                 strcat(logbuf, buf);
1113 #endif
1114                                                 flow->flags |= FLOW_TL;
1115                                 }
1116                         }
1117
1118                         /* Check for tcp flags presence (including CWR and ECE). */
1119                         if (flow->proto == IPPROTO_TCP
1120                                 && off_frag < 16
1121                                 && psize >= 16 - off_frag) {
1122                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1123 #if ((DEBUG) & DEBUG_C)
1124                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1125                                 strcat(logbuf, buf);
1126 #endif
1127                         }
1128
1129 #if ((DEBUG) & DEBUG_C)
1130                         sprintf(buf, " => %x", (unsigned) flow);
1131                         strcat(logbuf, buf);
1132                         my_log(LOG_DEBUG, "%s", logbuf);
1133 #endif
1134
1135 #if ((DEBUG) & DEBUG_I)
1136                         pkts_pending++;
1137                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1138                         if (pending_queue_trace < pending_queue_trace_candidate)
1139                                 pending_queue_trace = pending_queue_trace_candidate;
1140 #endif
1141
1142                         /* Flow complete - inform unpending_thread() about it */
1143                         pending_head->flags |= FLOW_PENDING;
1144                         pending_head = pending_head->next;
1145                 done:
1146                         pthread_cond_signal(&unpending_cond);
1147                 }
1148         }
1149         return 0;
1150 }
1151
1152 int main(int argc, char **argv)
1153 {
1154         char errpbuf[512];
1155         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1156         int c, i, write_fd, memory_limit = 0;
1157         struct addrinfo hints, *res;
1158         struct sockaddr_in saddr;
1159         pthread_attr_t tattr;
1160         struct sigaction sigact;
1161         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1162         struct timeval timeout;
1163
1164         sched_min = sched_get_priority_min(SCHED);
1165         sched_max = sched_get_priority_max(SCHED);
1166
1167         memset(&saddr, 0 , sizeof(saddr));
1168         memset(&hints, 0 , sizeof(hints));
1169         hints.ai_flags = AI_PASSIVE;
1170         hints.ai_family = AF_INET;
1171         hints.ai_socktype = SOCK_DGRAM;
1172
1173         /* Process command line options */
1174
1175         opterr = 0;
1176         while ((c = my_getopt(argc, argv, parms)) != -1) {
1177                 switch (c) {
1178                         case '?':
1179                                 usage();
1180
1181                         case 'h':
1182                                 usage();
1183                 }
1184         }
1185
1186         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1187         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1188         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1189         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1190         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1191         if (parms[nflag].count) {
1192                 switch (atoi(parms[nflag].arg)) {
1193                         case 1:
1194                                 netflow = &NetFlow1;
1195                                 break;
1196
1197                         case 5:
1198                                 break;
1199
1200                         case 7:
1201                                 netflow = &NetFlow7;
1202                                 break;
1203
1204                         default:
1205                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1206                                 exit(1);
1207                 }
1208         }
1209         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1210         if (parms[lflag].count) {
1211                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1212                         *log_suffix++ = 0;
1213                         if (*log_suffix) {
1214                                 sprintf(errpbuf, "[%s]", log_suffix);
1215                                 strcat(ident, errpbuf);
1216                         }
1217                 }
1218                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1219                 if (log_suffix) *--log_suffix = ':';
1220         }
1221         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1222         err_malloc:
1223                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1224                 exit(1);
1225         }
1226         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1227         if (parms[qflag].count) {
1228                 pending_queue_length = atoi(parms[qflag].arg);
1229                 if (pending_queue_length < 1) {
1230                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1231                         exit(1);
1232                 }
1233         }
1234         if (parms[rflag].count) {
1235                 schedp.sched_priority = atoi(parms[rflag].arg);
1236                 if (schedp.sched_priority
1237                         && (schedp.sched_priority < sched_min
1238                                 || schedp.sched_priority > sched_max)) {
1239                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1240                         exit(1);
1241                 }
1242         }
1243         if (parms[Bflag].count) {
1244                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1245         }
1246         if (parms[bflag].count) {
1247                 bulk_quantity = atoi(parms[bflag].arg);
1248                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1249                         fprintf(stderr, "Illegal %s\n", "bulk size");
1250                         exit(1);
1251                 }
1252         }
1253         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1254         if (parms[Xflag].count) {
1255                 for(i = 0; parms[Xflag].arg[i]; i++)
1256                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1257                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1258                         goto err_malloc;
1259                 rule = strtok(parms[Xflag].arg, ":");
1260                 for (i = 0; rule; i++) {
1261                         snmp_rules[i].len = strlen(rule);
1262                         if (snmp_rules[i].len > IFNAMSIZ) {
1263                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1264                                 exit(1);
1265                         }
1266                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1267                         if (!*(rule - 1)) *(rule - 1) = ',';
1268                         rule = strtok(NULL, ",");
1269                         if (!rule) {
1270                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1271                                 exit(1);
1272                         }
1273                         snmp_rules[i].base = atoi(rule);
1274                         *(rule - 1) = ':';
1275                         rule = strtok(NULL, ":");
1276                 }
1277                 nsnmp_rules = i;
1278         }
1279         if (parms[tflag].count)
1280                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1281         if (parms[aflag].count) {
1282                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1283                 bad_lhost:
1284                         fprintf(stderr, "Illegal %s\n", "source address");
1285                         exit(1);
1286                 } else {
1287                         saddr = *((struct sockaddr_in *) res->ai_addr);
1288                         freeaddrinfo(res);
1289                 }
1290         }
1291         if (parms[uflag].count) 
1292                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1293                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1294                         exit(1);
1295                 }
1296
1297
1298         /* Process collectors parameters. Brrrr... :-[ */
1299
1300         npeers = argc - optind;
1301         if (npeers > 1) {
1302                 /* Send to remote Netflow collector */
1303                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1304                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1305                         dhost = argv[i];
1306                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1307                         *dport++ = 0;
1308                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1309                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1310                                 exit(1);
1311                         }
1312                         peers[npeers].write_fd = write_fd;
1313                         peers[npeers].type = PEER_MIRROR;
1314                         peers[npeers].laddr = saddr;
1315                         peers[npeers].seq = 0;
1316                         if ((lhost = strchr(dport, '/'))) {
1317                                 *lhost++ = 0;
1318                                 if ((type = strchr(lhost, '/'))) {
1319                                         *type++ = 0;
1320                                         switch (*type) {
1321                                                 case 0:
1322                                                 case 'm':
1323                                                         break;
1324
1325                                                 case 'r':
1326                                                         peers[npeers].type = PEER_ROTATE;
1327                                                         npeers_rot++;
1328                                                         break;
1329
1330                                                 default:
1331                                                         goto bad_collector;
1332                                         }
1333                                 }
1334                                 if (*lhost) {
1335                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1336                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1337                                         freeaddrinfo(res);
1338                                 }
1339                         }
1340                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1341                                                 sizeof(struct sockaddr_in))) {
1342                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1343                                 exit(1);
1344                         }
1345                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1346 bad_collector:
1347                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1348                                 exit(1);
1349                         }
1350                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1351                         freeaddrinfo(res);
1352                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1353                                                 sizeof(struct sockaddr_in))) {
1354                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1355                                 exit(1);
1356                         }
1357
1358                         /* Restore command line */
1359                         if (type) *--type = '/';
1360                         if (lhost) *--lhost = '/';
1361                         *--dport = ':';
1362                 }
1363         }
1364         else if (parms[fflag].count) {
1365                 // log into a file
1366                 char *fname;
1367                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1368                 if (!(fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1369                 strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
1370                 
1371                 peers[0].write_fd = -1;
1372                 peers[0].type = PEER_FILE;
1373                 peers[0].seq = 0;
1374                 npeers++;
1375         }
1376         else 
1377                 usage();
1378
1379
1380         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1381         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1382         if (!ulog_handle) {
1383                 fprintf(stderr, "libipulog initialization error: %s",
1384                         ipulog_strerror(ipulog_errno));
1385                 exit(1);
1386         }
1387         if (sockbufsize)
1388                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1389                         &sockbufsize, sizeof(sockbufsize)) < 0)
1390                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1391
1392         /* Daemonize (if log destination stdout-free) */
1393
1394         my_log_open(ident, verbosity, log_dest);
1395         if (!(log_dest & 2)) {
1396                 switch (fork()) {
1397                         case -1:
1398                                 fprintf(stderr, "fork(): %s", strerror(errno));
1399                                 exit(1);
1400
1401                         case 0:
1402                                 setsid();
1403                                 freopen("/dev/null", "r", stdin);
1404                                 freopen("/dev/null", "w", stdout);
1405                                 freopen("/dev/null", "w", stderr);
1406                                 break;
1407
1408                         default:
1409                                 exit(0);
1410                 }
1411         } else {
1412                 setvbuf(stdout, (char *)0, _IONBF, 0);
1413                 setvbuf(stderr, (char *)0, _IONBF, 0);
1414         }
1415
1416         pid = getpid();
1417         sprintf(errpbuf, "[%ld]", (long) pid);
1418         strcat(ident, errpbuf);
1419
1420         /* Initialization */
1421
1422         hash_init(); /* Actually for crc16 only */
1423         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1424         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1425
1426 #ifdef UPTIME_TRICK
1427         /* Hope 12 days is enough :-/ */
1428         start_time_offset = 1 << 20;
1429
1430         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1431 #endif
1432         gettime(&start_time);
1433
1434         /*
1435         Build static pending queue as circular buffer.
1436         */
1437         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1438         pending_tail = pending_head;
1439         for (i = pending_queue_length - 1; i--;) {
1440                 if (!(pending_tail->next = mem_alloc())) {
1441                 err_mem_alloc:
1442                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1443                         exit(1);
1444                 }
1445                 pending_tail = pending_tail->next;
1446         }
1447         pending_tail->next = pending_head;
1448         pending_tail = pending_head;
1449
1450         sigemptyset(&sig_mask);
1451         sigact.sa_handler = &sighandler;
1452         sigact.sa_mask = sig_mask;
1453         sigact.sa_flags = 0;
1454         sigaddset(&sig_mask, SIGTERM);
1455         sigaction(SIGTERM, &sigact, 0);
1456 #if ((DEBUG) & DEBUG_I)
1457         sigaddset(&sig_mask, SIGUSR1);
1458         sigaction(SIGUSR1, &sigact, 0);
1459 #endif
1460         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1461                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1462                 exit(1);
1463         }
1464
1465         my_log(LOG_INFO, "Starting %s...", VERSION);
1466
1467         if (parms[cflag].count) {
1468                 if (chdir(parms[cflag].arg) || chroot(".")) {
1469                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1470                         exit(1);
1471                 }
1472         }
1473
1474         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1475         pthread_attr_init(&tattr);
1476         for (i = 0; i < THREADS - 1; i++) {
1477                 if (schedp.sched_priority > 0) {
1478                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1479                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1480                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1481                                 exit(1);
1482                         }
1483                 }
1484                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1485                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1486                         exit(1);
1487                 }
1488                 pthread_detach(thid);
1489                 schedp.sched_priority++;
1490         }
1491
1492         if (pw) {
1493                 if (setgroups(0, NULL)) {
1494                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1495                         exit(1);
1496                 }
1497                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1498                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1499                         exit(1);
1500                 }
1501                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1502                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1503                         exit(1);
1504                 }
1505         }
1506
1507         if (!(pidfile = fopen(pidfilepath, "w")))
1508                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1509         else {
1510                 fprintf(pidfile, "%ld\n", (long) pid);
1511                 fclose(pidfile);
1512         }
1513
1514         my_log(LOG_INFO, "pid: %d", pid);
1515         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1516                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1517                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1518                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1519                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1520                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1521                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1522         for (i = 0; i < nsnmp_rules; i++) {
1523                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1524                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1525         }
1526         for (i = 0; i < npeers; i++) {
1527                 switch (peers[i].type) {
1528                         case PEER_MIRROR:
1529                                 c = 'm';
1530                                 break;
1531                         case PEER_ROTATE:
1532                                 c = 'r';
1533                                 break;
1534                 }
1535                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1536                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1537                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1538         }
1539
1540         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1541
1542         timeout.tv_usec = 0;
1543         while (!killed
1544                 || (total_elements - free_elements - pending_queue_length)
1545                 || emit_count
1546                 || pending_tail->flags) {
1547
1548                 if (!sigs) {
1549                         timeout.tv_sec = scan_interval;
1550                         select(0, 0, 0, 0, &timeout);
1551                 }
1552
1553                 if (sigs & SIGTERM_MASK && !killed) {
1554                         sigs &= ~SIGTERM_MASK;
1555                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1556                         scan_interval = 1;
1557                         frag_lifetime = -1;
1558                         active_lifetime = -1;
1559                         inactive_lifetime = -1;
1560                         emit_timeout = 1;
1561                         unpending_timeout = 1;
1562                         killed = 1;
1563                         pthread_cond_signal(&scan_cond);
1564                         pthread_cond_signal(&unpending_cond);
1565                 }
1566
1567 #if ((DEBUG) & DEBUG_I)
1568                 if (sigs & SIGUSR1_MASK) {
1569                         sigs &= ~SIGUSR1_MASK;
1570                         info_debug();
1571                 }
1572 #endif
1573         }
1574         remove(pidfilepath);
1575 #if ((DEBUG) & DEBUG_I)
1576         info_debug();
1577 #endif
1578         my_log(LOG_INFO, "Done.");
1579 #ifdef WALL
1580         return 0;
1581 #endif
1582 }