Usage
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8 */
9
10 #include <common.h>
11
12 /* stdout, stderr, freopen() */
13 #include <stdio.h>
14
15 /* atoi(), exit() */
16 #include <stdlib.h>
17
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
19 #include <unistd.h>
20
21 /* strerror() */
22 #include <string.h>
23
24 /* sig*() */
25 #include <signal.h>
26
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
29         int fd;
30         u_int8_t blocking;
31         struct sockaddr_nl local;
32         struct sockaddr_nl peer;
33         struct nlmsghdr* last_nlhdr;
34 };
35
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
46 #include <net/if.h>
47
48 #include <sys/param.h>
49 #include <pwd.h>
50 #ifdef OS_LINUX
51 #include <grp.h>
52 #endif
53
54 /* pthread_*() */
55 #include <pthread.h>
56
57 /* errno */
58 #include <errno.h>
59
60 /* getaddrinfo() */
61 #include <netdb.h>
62
63 /* nanosleep() */
64 #include <time.h>
65
66 /* gettimeofday() */
67 #include <sys/time.h>
68
69 /* scheduling */
70 #include <sched.h>
71
72 /* select() (POSIX)*/
73 #include <sys/select.h>
74
75 /* open() */
76 #include <sys/stat.h>
77 #include <fcntl.h>
78
79 #include <fprobe-ulog.h>
80 #include <my_log.h>
81 #include <my_getopt.h>
82 #include <netflow.h>
83 #include <hash.h>
84 #include <mem.h>
85
86 enum {
87         aflag,
88         Bflag,
89         bflag,
90         cflag,
91         dflag,
92         eflag,
93         Eflag,
94         fflag,
95         gflag,
96         hflag,
97         lflag,
98         mflag,
99         Mflag,
100         nflag,
101         qflag,
102         rflag,
103         sflag,
104         tflag,
105         Tflag,
106         Uflag,
107         uflag,
108         vflag,
109         Xflag,
110 };
111
112 static struct getopt_parms parms[] = {
113         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
114         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
115         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
116         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
117         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
118         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
119         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
120         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
121         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
122         {'h', 0, 0, 0},
123         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
124         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'M', 0, 0, 0},
126         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {0, 0, 0, 0}
137 };
138
139 extern char *optarg;
140 extern int optind, opterr, optopt;
141 extern int errno;
142
143 extern struct NetFlow NetFlow1;
144 extern struct NetFlow NetFlow5;
145 extern struct NetFlow NetFlow7;
146
147 #define mark_is_tos parms[Mflag].count
148 static unsigned scan_interval = 5;
149 static int frag_lifetime = 30;
150 static int inactive_lifetime = 60;
151 static int active_lifetime = 300;
152 static int sockbufsize;
153 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
154 #if (MEM_BITS == 0) || (MEM_BITS == 16)
155 #define BULK_QUANTITY 10000
156 #else
157 #define BULK_QUANTITY 200
158 #endif
159
160 static unsigned epoch_length=60, log_epochs=1; 
161 static unsigned cur_epoch=0,prev_uptime=0;
162
163 static unsigned bulk_quantity = BULK_QUANTITY;
164 static unsigned pending_queue_length = 100;
165 static struct NetFlow *netflow = &NetFlow5;
166 static unsigned verbosity = 6;
167 static unsigned log_dest = MY_LOG_SYSLOG;
168 static struct Time start_time;
169 static long start_time_offset;
170 static int off_tl;
171 /* From mem.c */
172 extern unsigned total_elements;
173 extern unsigned free_elements;
174 extern unsigned total_memory;
175 #if ((DEBUG) & DEBUG_I)
176 static unsigned emit_pkts, emit_queue;
177 static uint64_t size_total;
178 static unsigned pkts_total, pkts_total_fragmented;
179 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
180 static unsigned pkts_pending, pkts_pending_done;
181 static unsigned pending_queue_trace, pending_queue_trace_candidate;
182 static unsigned flows_total, flows_fragmented;
183 #endif
184 static unsigned emit_count;
185 static uint32_t emit_sequence;
186 static unsigned emit_rate_bytes, emit_rate_delay;
187 static struct Time emit_time;
188 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
189 static pthread_t thid;
190 static sigset_t sig_mask;
191 static struct sched_param schedp;
192 static int sched_min, sched_max;
193 static int npeers, npeers_rot;
194 static struct peer *peers;
195 static int sigs;
196
197 static struct Flow *flows[1 << HASH_BITS];
198 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
199
200 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
202
203 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
204 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
205 static struct Flow *pending_head, *pending_tail;
206 static struct Flow *scan_frag_dreg;
207
208 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
209 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
210 static struct Flow *flows_emit;
211
212 static char ident[256] = "fprobe-ulog";
213 static FILE *pidfile;
214 static char *pidfilepath;
215 static pid_t pid;
216 static int killed;
217 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
218 static struct ipulog_handle *ulog_handle;
219 static uint32_t ulog_gmask = 1;
220 static char *cap_buf;
221 static int nsnmp_rules;
222 static struct snmp_rule *snmp_rules;
223 static struct passwd *pw = 0;
224
225 void usage()
226 {
227         fprintf(stdout,
228                 "fprobe-ulog: a NetFlow probe. Version %s\n"
229                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
230                 "\n"
231                 "-h\t\tDisplay this help\n"
232                 "-U <mask>\tULOG group bitwise mask [1]\n"
233                 "-s <seconds>\tHow often scan for expired flows [5]\n"
234                 "-g <seconds>\tFragmented flow lifetime [30]\n"
235                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
236                 "-f <filename>\tLog flow data in a file\n"
237                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
238                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
239                 "-a <address>\tUse <address> as source for NetFlow flow\n"
240                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
241                 "-M\t\tUse netfilter mark value as ToS flag\n"
242                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
243                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
244                 "-q <flows>\tPending queue length [100]\n"
245                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
246                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
247                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
248                 "-c <directory>\tDirectory to chroot to\n"
249                 "-u <user>\tUser to run as\n"
250                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
251                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
252                 "-y <remote:port>\tAddress of the NetFlow collector\n"
253                 "-f <writable file>\tFile to write data into\n"
254                 "-T <n>\tRotate log file every n epochs\n"
255                 "-E <[1..60]>\tSize of an epoch in minutes\n",
256                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
257         exit(0);
258 }
259
260 #if ((DEBUG) & DEBUG_I)
261 void info_debug()
262 {
263         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
264                 pkts_total, pkts_total_fragmented, size_total,
265                 pkts_pending - pkts_pending_done, pending_queue_trace);
266         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
267                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
268         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
269                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
270         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
271                 total_elements, free_elements, total_memory);
272 }
273 #endif
274
275 void sighandler(int sig)
276 {
277         switch (sig) {
278                 case SIGTERM:
279                         sigs |= SIGTERM_MASK;
280                         break;
281 #if ((DEBUG) & DEBUG_I)
282                 case SIGUSR1:
283                         sigs |= SIGUSR1_MASK;
284                         break;
285 #endif
286         }
287 }
288
289 void gettime(struct Time *now)
290 {
291         struct timeval t;
292
293         gettimeofday(&t, 0);
294         now->sec = t.tv_sec;
295         now->usec = t.tv_usec;
296 }
297
298 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
299 {
300         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
301 }
302
303 /* Uptime in miliseconds */
304 uint32_t getuptime(struct Time *t)
305 {
306         /* Maximum uptime is about 49/2 days */
307         return cmpmtime(t, &start_time);
308 }
309
310 hash_t hash_flow(struct Flow *flow)
311 {
312         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
313         else return hash(flow, sizeof(struct Flow_TL));
314 }
315
316 uint16_t snmp_index(char *name) {
317         uint32_t i;
318
319         if (!*name) return 0;
320
321         for (i = 0; (int) i < nsnmp_rules; i++) {
322                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
323                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
324         }
325
326         if ((i = if_nametoindex(name))) return i;
327
328         return -1;
329 }
330
331 inline void copy_flow(struct Flow *src, struct Flow *dst)
332 {
333         dst->iif = src->iif;
334         dst->oif = src->oif;
335         dst->sip = src->sip;
336         dst->dip = src->dip;
337         dst->tos = src->tos;
338         dst->proto = src->proto;
339         dst->tcp_flags = src->tcp_flags;
340         dst->id = src->id;
341         dst->sp = src->sp;
342         dst->dp = src->dp;
343         dst->pkts = src->pkts;
344         dst->size = src->size;
345         dst->sizeF = src->sizeF;
346         dst->sizeP = src->sizeP;
347         dst->ctime = src->ctime;
348         dst->mtime = src->mtime;
349         dst->flags = src->flags;
350 }
351
352 unsigned get_log_fd(char *fname, unsigned cur_fd) {
353         struct Time now;
354         unsigned cur_uptime;
355         int ret_fd;
356         gettime(&now);
357         cur_uptime = getuptime(&now);
358
359         /* Epoch lenght in minutes */
360         if ((cur_uptime - prev_uptime) > (1000 * 60 * epoch_length)) {
361                 char nextname[MAX_PATH_LEN];
362                 int write_fd;
363                 prev_uptime = cur_uptime;
364                 cur_epoch = (cur_epoch + 1) % log_epochs;
365                 close(cur_fd);
366                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
367                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
368                         fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
369                         exit(1);
370                 }
371                 ret_fd = write_fd;
372         }
373         else
374                 ret_fd = cur_fd;
375         return(ret_fd);
376 }
377
378 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
379 {
380         struct Flow **flowpp;
381
382 #ifdef WALL
383         flowpp = 0;
384 #endif
385
386         if (prev) flowpp = *prev;
387
388         while (where) {
389                 if (where->sip.s_addr == what->sip.s_addr
390                         && where->dip.s_addr == what->dip.s_addr
391                         && where->proto == what->proto) {
392                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
393                                 case 0:
394                                         /* Both unfragmented */
395                                         if ((what->sp == where->sp)
396                                                 && (what->dp == where->dp)) goto done;
397                                         break;
398                                 case 2:
399                                         /* Both fragmented */
400                                         if (where->id == what->id) goto done;
401                                         break;
402                         }
403                 }
404                 flowpp = &where->next;
405                 where = where->next;
406         }
407 done:
408         if (prev) *prev = flowpp;
409         return where;
410 }
411
412 int put_into(struct Flow *flow, int flag
413 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
414         , char *logbuf
415 #endif
416 )
417 {
418         int ret = 0;
419         hash_t h;
420         struct Flow *flown, **flowpp;
421 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
422         char buf[64];
423 #endif
424
425         h = hash_flow(flow);
426 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
427         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
428         strcat(logbuf, buf);
429 #endif
430         pthread_mutex_lock(&flows_mutex[h]);
431         flowpp = &flows[h];
432         if (!(flown = find(flows[h], flow, &flowpp))) {
433                 /* No suitable flow found - add */
434                 if (flag == COPY_INTO) {
435                         if ((flown = mem_alloc())) {
436                                 copy_flow(flow, flown);
437                                 flow = flown;
438                         } else {
439 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
440                                 my_log(LOG_ERR, "%s %s. %s",
441                                         "mem_alloc():", strerror(errno), "packet lost");
442 #endif
443                                 return -1;
444                         }
445                 }
446                 flow->next = flows[h];
447                 flows[h] = flow;
448 #if ((DEBUG) & DEBUG_I)
449                 flows_total++;
450                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
451 #endif
452 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
453                 if (flown) {
454                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
455                         strcat(logbuf, buf);
456                 }
457 #endif
458         } else {
459                 /* Found suitable flow - update */
460 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
461                 sprintf(buf, " +> %x", (unsigned) flown);
462                 strcat(logbuf, buf);
463 #endif
464                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
465                         flown->mtime = flow->mtime;
466                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
467                         flown->ctime = flow->ctime;
468                 flown->tcp_flags |= flow->tcp_flags;
469                 flown->size += flow->size;
470                 flown->pkts += flow->pkts;
471                 if (flow->flags & FLOW_FRAG) {
472                         /* Fragmented flow require some additional work */
473                         if (flow->flags & FLOW_TL) {
474                                 /*
475                                 ?FIXME?
476                                 Several packets with FLOW_TL (attack)
477                                 */
478                                 flown->sp = flow->sp;
479                                 flown->dp = flow->dp;
480                         }
481                         if (flow->flags & FLOW_LASTFRAG) {
482                                 /*
483                                 ?FIXME?
484                                 Several packets with FLOW_LASTFRAG (attack)
485                                 */
486                                 flown->sizeP = flow->sizeP;
487                         }
488                         flown->flags |= flow->flags;
489                         flown->sizeF += flow->sizeF;
490                         if ((flown->flags & FLOW_LASTFRAG)
491                                 && (flown->sizeF >= flown->sizeP)) {
492                                 /* All fragments received - flow reassembled */
493                                 *flowpp = flown->next;
494                                 pthread_mutex_unlock(&flows_mutex[h]);
495 #if ((DEBUG) & DEBUG_I)
496                                 flows_total--;
497                                 flows_fragmented--;
498 #endif
499                                 flown->id = 0;
500                                 flown->flags &= ~FLOW_FRAG;
501 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
502                                 strcat(logbuf," R");
503 #endif
504                                 ret = put_into(flown, MOVE_INTO
505 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
506                                                 , logbuf
507 #endif
508                                         );
509                         }
510                 }
511                 if (flag == MOVE_INTO) mem_free(flow);
512         }
513         pthread_mutex_unlock(&flows_mutex[h]);
514         return ret;
515 }
516
517 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
518 {
519         int i;
520
521         for (i = 0; i < fields; i++) {
522 #if ((DEBUG) & DEBUG_F)
523                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
524 #endif
525                 switch (format[i]) {
526                         case NETFLOW_IPV4_SRC_ADDR:
527                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
528                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
529                                 break;
530
531                         case NETFLOW_IPV4_DST_ADDR:
532                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
533                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
534                                 break;
535
536                         case NETFLOW_INPUT_SNMP:
537                                 *((uint16_t *) p) = htons(flow->iif);
538                                 p += NETFLOW_INPUT_SNMP_SIZE;
539                                 break;
540
541                         case NETFLOW_OUTPUT_SNMP:
542                                 *((uint16_t *) p) = htons(flow->oif);
543                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
544                                 break;
545
546                         case NETFLOW_PKTS_32:
547                                 *((uint32_t *) p) = htonl(flow->pkts);
548                                 p += NETFLOW_PKTS_32_SIZE;
549                                 break;
550
551                         case NETFLOW_BYTES_32:
552                                 *((uint32_t *) p) = htonl(flow->size);
553                                 p += NETFLOW_BYTES_32_SIZE;
554                                 break;
555
556                         case NETFLOW_FIRST_SWITCHED:
557                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
558                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
559                                 break;
560
561                         case NETFLOW_LAST_SWITCHED:
562                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
563                                 p += NETFLOW_LAST_SWITCHED_SIZE;
564                                 break;
565
566                         case NETFLOW_L4_SRC_PORT:
567                                 *((uint16_t *) p) = flow->sp;
568                                 p += NETFLOW_L4_SRC_PORT_SIZE;
569                                 break;
570
571                         case NETFLOW_L4_DST_PORT:
572                                 *((uint16_t *) p) = flow->dp;
573                                 p += NETFLOW_L4_DST_PORT_SIZE;
574                                 break;
575
576                         case NETFLOW_PROT:
577                                 *((uint8_t *) p) = flow->proto;
578                                 p += NETFLOW_PROT_SIZE;
579                                 break;
580
581                         case NETFLOW_SRC_TOS:
582                                 *((uint8_t *) p) = flow->tos;
583                                 p += NETFLOW_SRC_TOS_SIZE;
584                                 break;
585
586                         case NETFLOW_TCP_FLAGS:
587                                 *((uint8_t *) p) = flow->tcp_flags;
588                                 p += NETFLOW_TCP_FLAGS_SIZE;
589                                 break;
590
591                         case NETFLOW_VERSION:
592                                 *((uint16_t *) p) = htons(netflow->Version);
593                                 p += NETFLOW_VERSION_SIZE;
594                                 break;
595
596                         case NETFLOW_COUNT:
597                                 *((uint16_t *) p) = htons(emit_count);
598                                 p += NETFLOW_COUNT_SIZE;
599                                 break;
600
601                         case NETFLOW_UPTIME:
602                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
603                                 p += NETFLOW_UPTIME_SIZE;
604                                 break;
605
606                         case NETFLOW_UNIX_SECS:
607                                 *((uint32_t *) p) = htonl(emit_time.sec);
608                                 p += NETFLOW_UNIX_SECS_SIZE;
609                                 break;
610
611                         case NETFLOW_UNIX_NSECS:
612                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
613                                 p += NETFLOW_UNIX_NSECS_SIZE;
614                                 break;
615
616                         case NETFLOW_FLOW_SEQUENCE:
617                                 //*((uint32_t *) p) = htonl(emit_sequence);
618                                 *((uint32_t *) p) = 0;
619                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
620                                 break;
621
622                         case NETFLOW_PAD8:
623                         /* Unsupported (uint8_t) */
624                         case NETFLOW_ENGINE_TYPE:
625                         case NETFLOW_ENGINE_ID:
626                         case NETFLOW_FLAGS7_1:
627                         case NETFLOW_SRC_MASK:
628                         case NETFLOW_DST_MASK:
629                                 *((uint8_t *) p) = 0;
630                                 p += NETFLOW_PAD8_SIZE;
631                                 break;
632                         case NETFLOW_PLANETLAB_XID:
633                                 *((uint16_t *) p) = flow->tos;
634                                 p += NETFLOW_PLANETLAB_XID_SIZE;
635                                 break;
636                         case NETFLOW_PAD16:
637                         /* Unsupported (uint16_t) */
638                         case NETFLOW_SRC_AS:
639                         case NETFLOW_DST_AS:
640                         case NETFLOW_FLAGS7_2:
641                                 *((uint16_t *) p) = 0;
642                                 p += NETFLOW_PAD16_SIZE;
643                                 break;
644
645                         case NETFLOW_PAD32:
646                         /* Unsupported (uint32_t) */
647                         case NETFLOW_IPV4_NEXT_HOP:
648                         case NETFLOW_ROUTER_SC:
649                                 *((uint32_t *) p) = 0;
650                                 p += NETFLOW_PAD32_SIZE;
651                                 break;
652
653                         default:
654                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
655                                         format, i, format[i]);
656                                 exit(1);
657                 }
658         }
659 #if ((DEBUG) & DEBUG_F)
660         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
661 #endif
662         return p;
663 }
664
665 void setuser() {
666         /*
667         Workaround for clone()-based threads
668         Try to change EUID independently of main thread
669         */
670         if (pw) {
671                 setgroups(0, NULL);
672                 setregid(pw->pw_gid, pw->pw_gid);
673                 setreuid(pw->pw_uid, pw->pw_uid);
674         }
675 }
676
677 void *emit_thread()
678 {
679         struct Flow *flow;
680         void *p;
681         struct timeval now;
682         struct timespec timeout;
683         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
684
685         p = (void *) &emit_packet + netflow->HeaderSize;
686         timeout.tv_nsec = 0;
687
688         setuser();
689
690         for (;;) {
691                 pthread_mutex_lock(&emit_mutex);
692                 while (!flows_emit) {
693                         gettimeofday(&now, 0);
694                         timeout.tv_sec = now.tv_sec + emit_timeout;
695                         /* Do not wait until emit_packet will filled - it may be too long */
696                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
697                                 pthread_mutex_unlock(&emit_mutex);
698                                 goto sendit;
699                         }
700                 }
701                 flow = flows_emit;
702                 flows_emit = flows_emit->next;
703 #if ((DEBUG) & DEBUG_I)
704                 emit_queue--;
705 #endif          
706                 pthread_mutex_unlock(&emit_mutex);
707
708 #ifdef UPTIME_TRICK
709                 if (!emit_count) {
710                         gettime(&start_time);
711                         start_time.sec -= start_time_offset;
712                 }
713 #endif
714                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
715                 mem_free(flow);
716                 emit_count++;
717 #ifdef PF2_DEBUG
718                 printf("Emit count = %d\n", emit_count);
719                 fflush(stdout);
720 #endif
721                 if (emit_count == netflow->MaxFlows) {
722                 sendit:
723                         gettime(&emit_time);
724                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
725                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
726                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
727                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
728                         peer_rot_cur = 0;
729                         for (i = 0; i < npeers; i++) {
730                                 if (peers[0].type == PEER_FILE) {
731                                                 if (netflow->SeqOffset)
732                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
733 #define MESSAGES
734                                                 peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
735                                                 ret = write(peers[0].write_fd, emit_packet, size);
736                                                 if (ret < size) {
737 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
738                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
739                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
740 #endif
741 #undef MESSAGES
742                                                 }
743 #if ((DEBUG) & DEBUG_E)
744                                                 commaneelse {
745                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
746                                                                 emit_count, i + 1, peers[i].seq);
747                                                 }
748 #endif
749                                                 peers[0].seq += emit_count;
750
751                                                 /* Rate limit */
752                                                 if (emit_rate_bytes) {
753                                                         sent += size;
754                                                         delay = sent / emit_rate_bytes;
755                                                         if (delay) {
756                                                                 sent %= emit_rate_bytes;
757                                                                 timeout.tv_sec = 0;
758                                                                 timeout.tv_nsec = emit_rate_delay * delay;
759                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
760                                                         }
761                                                 }
762                                         }
763                                 else
764                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
765                                 else
766                                 if (peers[i].type == PEER_ROTATE) 
767                                         if (peer_rot_cur++ == peer_rot_work) {
768                                         sendreal:
769                                                 if (netflow->SeqOffset)
770                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
771                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
772                                                 if (ret < size) {
773 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
774                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
775                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
776 #endif
777                                                 }
778 #if ((DEBUG) & DEBUG_E)
779                                                 commaneelse {
780                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
781                                                                 emit_count, i + 1, peers[i].seq);
782                                                 }
783 #endif
784                                                 peers[i].seq += emit_count;
785
786                                                 /* Rate limit */
787                                                 if (emit_rate_bytes) {
788                                                         sent += size;
789                                                         delay = sent / emit_rate_bytes;
790                                                         if (delay) {
791                                                                 sent %= emit_rate_bytes;
792                                                                 timeout.tv_sec = 0;
793                                                                 timeout.tv_nsec = emit_rate_delay * delay;
794                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
795                                                         }
796                                                 }
797                                         }
798                         }
799                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
800                         emit_sequence += emit_count;
801                         emit_count = 0;
802 #if ((DEBUG) & DEBUG_I)
803                         emit_pkts++;
804 #endif
805                 }
806         }
807 }       
808
809 void *unpending_thread()
810 {
811         struct timeval now;
812         struct timespec timeout;
813 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
814         char logbuf[256];
815 #endif
816
817         setuser();
818
819         timeout.tv_nsec = 0;
820         pthread_mutex_lock(&unpending_mutex);
821
822         for (;;) {
823                 while (!(pending_tail->flags & FLOW_PENDING)) {
824                         gettimeofday(&now, 0);
825                         timeout.tv_sec = now.tv_sec + unpending_timeout;
826                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
827                 }
828
829 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
830                 *logbuf = 0;
831 #endif
832                 if (put_into(pending_tail, COPY_INTO
833 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
834                                 , logbuf
835 #endif
836                         ) < 0) {
837 #if ((DEBUG) & DEBUG_I)
838                         pkts_lost_unpending++;
839 #endif                          
840                 }
841
842 #if ((DEBUG) & DEBUG_U)
843                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
844 #endif
845
846                 pending_tail->flags = 0;
847                 pending_tail = pending_tail->next;
848 #if ((DEBUG) & DEBUG_I)
849                 pkts_pending_done++;
850 #endif
851         }
852 }
853
854 void *scan_thread()
855 {
856 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
857         char logbuf[256];
858 #endif
859         int i;
860         struct Flow *flow, **flowpp;
861         struct Time now;
862         struct timespec timeout;
863
864         setuser();
865
866         timeout.tv_nsec = 0;
867         pthread_mutex_lock(&scan_mutex);
868
869         for (;;) {
870                 gettime(&now);
871                 timeout.tv_sec = now.sec + scan_interval;
872                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
873
874                 gettime(&now);
875 #if ((DEBUG) & DEBUG_S)
876                 my_log(LOG_DEBUG, "S: %d", now.sec);
877 #endif
878                 for (i = 0; i < 1 << HASH_BITS ; i++) {
879                         pthread_mutex_lock(&flows_mutex[i]);
880                         flow = flows[i];
881                         flowpp = &flows[i];
882                         while (flow) {
883                                 if (flow->flags & FLOW_FRAG) {
884                                         /* Process fragmented flow */
885                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
886                                                 /* Fragmented flow expired - put it into special chain */
887 #if ((DEBUG) & DEBUG_I)
888                                                 flows_fragmented--;
889                                                 flows_total--;
890 #endif
891                                                 *flowpp = flow->next;
892                                                 flow->id = 0;
893                                                 flow->flags &= ~FLOW_FRAG;
894                                                 flow->next = scan_frag_dreg;
895                                                 scan_frag_dreg = flow;
896                                                 flow = *flowpp;
897                                                 continue;
898                                         }
899                                 } else {
900                                         /* Flow is not frgamented */
901                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
902                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
903                                                 /* Flow expired */
904 #if ((DEBUG) & DEBUG_S)
905                                                 my_log(LOG_DEBUG, "S: E %x", flow);
906 #endif
907 #if ((DEBUG) & DEBUG_I)
908                                                 flows_total--;
909 #endif
910                                                 *flowpp = flow->next;
911                                                 pthread_mutex_lock(&emit_mutex);
912                                                 flow->next = flows_emit;
913                                                 flows_emit = flow;
914 #if ((DEBUG) & DEBUG_I)
915                                                 emit_queue++;
916 #endif                          
917                                                 pthread_mutex_unlock(&emit_mutex);
918                                                 flow = *flowpp;
919                                                 continue;
920                                         }
921                                 }
922                                 flowpp = &flow->next;
923                                 flow = flow->next;
924                         } /* chain loop */
925                         pthread_mutex_unlock(&flows_mutex[i]);
926                 } /* hash loop */
927                 if (flows_emit) pthread_cond_signal(&emit_cond);
928
929                 while (scan_frag_dreg) {
930                         flow = scan_frag_dreg;
931                         scan_frag_dreg = flow->next;
932 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
933                         *logbuf = 0;
934 #endif
935                         put_into(flow, MOVE_INTO
936 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
937                                 , logbuf
938 #endif
939                         );
940 #if ((DEBUG) & DEBUG_S)
941                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
942 #endif
943                 }
944         }
945 }
946
947 void *cap_thread()
948 {
949         struct ulog_packet_msg *ulog_msg;
950         struct ip *nl;
951         void *tl;
952         struct Flow *flow;
953         int len, off_frag, psize;
954 #if ((DEBUG) & DEBUG_C)
955         char buf[64];
956         char logbuf[256];
957 #endif
958
959         setuser();
960
961         while (!killed) {
962                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
963                 if (len <= 0) {
964                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
965                         continue;
966                 }
967                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
968
969 #if ((DEBUG) & DEBUG_C)
970                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
971 #endif
972
973                         nl = (void *) &ulog_msg->payload;
974                         psize = ulog_msg->data_len;
975
976                         /* Sanity check */
977                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
978 #if ((DEBUG) & DEBUG_C)
979                                 strcat(logbuf, " U");
980                                 my_log(LOG_DEBUG, "%s", logbuf);
981 #endif
982 #if ((DEBUG) & DEBUG_I)
983                                 pkts_ignored++;
984 #endif
985                                 continue;
986                         }
987
988                         if (pending_head->flags) {
989 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
990                                 my_log(LOG_ERR,
991 # if ((DEBUG) & DEBUG_C)
992                                         "%s %s %s", logbuf,
993 # else
994                                         "%s %s",
995 # endif
996                                         "pending queue full:", "packet lost");
997 #endif
998 #if ((DEBUG) & DEBUG_I)
999                                 pkts_lost_capture++;
1000 #endif
1001                                 goto done;
1002                         }
1003
1004 #if ((DEBUG) & DEBUG_I)
1005                         pkts_total++;
1006 #endif
1007
1008                         flow = pending_head;
1009
1010                         /* ?FIXME? Add sanity check for ip_len? */
1011                         flow->size = ntohs(nl->ip_len);
1012 #if ((DEBUG) & DEBUG_I)
1013                         size_total += flow->size;
1014 #endif
1015
1016                         flow->sip = nl->ip_src;
1017                         flow->dip = nl->ip_dst;
1018                         flow->iif = snmp_index(ulog_msg->indev_name);
1019                         flow->oif = snmp_index(ulog_msg->outdev_name);
1020                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1021                         flow->proto = nl->ip_p;
1022                         flow->id = 0;
1023                         flow->tcp_flags = 0;
1024                         flow->pkts = 1;
1025                         flow->sizeF = 0;
1026                         flow->sizeP = 0;
1027                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1028                         if (ulog_msg->timestamp_sec) {
1029                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1030                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1031                         } else gettime(&flow->ctime);
1032                         flow->mtime = flow->ctime;
1033
1034                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1035
1036                         /*
1037                         Offset (from network layer) to transport layer header/IP data
1038                         IOW IP header size ;-)
1039
1040                         ?FIXME?
1041                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1042                         */
1043                         off_tl = nl->ip_hl << 2;
1044                         tl = (void *) nl + off_tl;
1045
1046                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1047                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1048                         psize -= off_tl;
1049                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1050                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1051
1052                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1053                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1054 #if ((DEBUG) & DEBUG_C)
1055                                 strcat(logbuf, " F");
1056 #endif
1057 #if ((DEBUG) & DEBUG_I)
1058                                 pkts_total_fragmented++;
1059 #endif
1060                                 flow->flags |= FLOW_FRAG;
1061                                 flow->id = nl->ip_id;
1062
1063                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1064                                         /* Packet whith IP_MF contains information about whole datagram size */
1065                                         flow->flags |= FLOW_LASTFRAG;
1066                                         /* size = frag_offset*8 + data_size */
1067                                         flow->sizeP = off_frag + flow->sizeF;
1068                                 }
1069                         }
1070
1071 #if ((DEBUG) & DEBUG_C)
1072                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1073                         strcat(logbuf, buf);
1074                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1075                         strcat(logbuf, buf);
1076 #endif
1077
1078                         /*
1079                         Fortunately most interesting transport layer information fit
1080                         into first 8 bytes of IP data field (minimal nonzero size).
1081                         Thus we don't need actual packet reassembling to build whole
1082                         transport layer data. We only check the fragment offset for
1083                         zero value to find packet with this information.
1084                         */
1085                         if (!off_frag && psize >= 8) {
1086                                 switch (flow->proto) {
1087                                         case IPPROTO_TCP:
1088                                         case IPPROTO_UDP:
1089                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1090                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1091                                                 goto tl_known;
1092
1093 #ifdef ICMP_TRICK
1094                                         case IPPROTO_ICMP:
1095                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1096                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1097                                                 goto tl_known;
1098 #endif
1099 #ifdef ICMP_TRICK_CISCO
1100                                         case IPPROTO_ICMP:
1101                                                 flow->dp = *((int32_t *) tl);
1102                                                 goto tl_known;
1103 #endif
1104
1105                                         default:
1106                                                 /* Unknown transport layer */
1107 #if ((DEBUG) & DEBUG_C)
1108                                                 strcat(logbuf, " U");
1109 #endif
1110                                                 flow->sp = 0;
1111                                                 flow->dp = 0;
1112                                                 break;
1113
1114                                         tl_known:
1115 #if ((DEBUG) & DEBUG_C)
1116                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1117                                                 strcat(logbuf, buf);
1118 #endif
1119                                                 flow->flags |= FLOW_TL;
1120                                 }
1121                         }
1122
1123                         /* Check for tcp flags presence (including CWR and ECE). */
1124                         if (flow->proto == IPPROTO_TCP
1125                                 && off_frag < 16
1126                                 && psize >= 16 - off_frag) {
1127                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1128 #if ((DEBUG) & DEBUG_C)
1129                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1130                                 strcat(logbuf, buf);
1131 #endif
1132                         }
1133
1134 #if ((DEBUG) & DEBUG_C)
1135                         sprintf(buf, " => %x", (unsigned) flow);
1136                         strcat(logbuf, buf);
1137                         my_log(LOG_DEBUG, "%s", logbuf);
1138 #endif
1139
1140 #if ((DEBUG) & DEBUG_I)
1141                         pkts_pending++;
1142                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1143                         if (pending_queue_trace < pending_queue_trace_candidate)
1144                                 pending_queue_trace = pending_queue_trace_candidate;
1145 #endif
1146
1147                         /* Flow complete - inform unpending_thread() about it */
1148                         pending_head->flags |= FLOW_PENDING;
1149                         pending_head = pending_head->next;
1150                 done:
1151                         pthread_cond_signal(&unpending_cond);
1152                 }
1153         }
1154         return 0;
1155 }
1156
1157 int main(int argc, char **argv)
1158 {
1159         char errpbuf[512];
1160         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1161         int c, i, write_fd, memory_limit = 0;
1162         struct addrinfo hints, *res;
1163         struct sockaddr_in saddr;
1164         pthread_attr_t tattr;
1165         struct sigaction sigact;
1166         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1167         struct timeval timeout;
1168
1169         sched_min = sched_get_priority_min(SCHED);
1170         sched_max = sched_get_priority_max(SCHED);
1171
1172         memset(&saddr, 0 , sizeof(saddr));
1173         memset(&hints, 0 , sizeof(hints));
1174         hints.ai_flags = AI_PASSIVE;
1175         hints.ai_family = AF_INET;
1176         hints.ai_socktype = SOCK_DGRAM;
1177
1178         /* Process command line options */
1179
1180         opterr = 0;
1181         while ((c = my_getopt(argc, argv, parms)) != -1) {
1182                 switch (c) {
1183                         case '?':
1184                                 usage();
1185
1186                         case 'h':
1187                                 usage();
1188                 }
1189         }
1190
1191         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1192         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1193         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1194         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1195         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1196         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1197         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1198         if (parms[nflag].count) {
1199                 switch (atoi(parms[nflag].arg)) {
1200                         case 1:
1201                                 netflow = &NetFlow1;
1202                                 break;
1203
1204                         case 5:
1205                                 break;
1206
1207                         case 7:
1208                                 netflow = &NetFlow7;
1209                                 break;
1210
1211                         default:
1212                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1213                                 exit(1);
1214                 }
1215         }
1216         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1217         if (parms[lflag].count) {
1218                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1219                         *log_suffix++ = 0;
1220                         if (*log_suffix) {
1221                                 sprintf(errpbuf, "[%s]", log_suffix);
1222                                 strcat(ident, errpbuf);
1223                         }
1224                 }
1225                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1226                 if (log_suffix) *--log_suffix = ':';
1227         }
1228         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1229         err_malloc:
1230                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1231                 exit(1);
1232         }
1233         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1234         if (parms[qflag].count) {
1235                 pending_queue_length = atoi(parms[qflag].arg);
1236                 if (pending_queue_length < 1) {
1237                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1238                         exit(1);
1239                 }
1240         }
1241         if (parms[rflag].count) {
1242                 schedp.sched_priority = atoi(parms[rflag].arg);
1243                 if (schedp.sched_priority
1244                         && (schedp.sched_priority < sched_min
1245                                 || schedp.sched_priority > sched_max)) {
1246                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1247                         exit(1);
1248                 }
1249         }
1250         if (parms[Bflag].count) {
1251                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1252         }
1253         if (parms[bflag].count) {
1254                 bulk_quantity = atoi(parms[bflag].arg);
1255                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1256                         fprintf(stderr, "Illegal %s\n", "bulk size");
1257                         exit(1);
1258                 }
1259         }
1260         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1261         if (parms[Xflag].count) {
1262                 for(i = 0; parms[Xflag].arg[i]; i++)
1263                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1264                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1265                         goto err_malloc;
1266                 rule = strtok(parms[Xflag].arg, ":");
1267                 for (i = 0; rule; i++) {
1268                         snmp_rules[i].len = strlen(rule);
1269                         if (snmp_rules[i].len > IFNAMSIZ) {
1270                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1271                                 exit(1);
1272                         }
1273                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1274                         if (!*(rule - 1)) *(rule - 1) = ',';
1275                         rule = strtok(NULL, ",");
1276                         if (!rule) {
1277                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1278                                 exit(1);
1279                         }
1280                         snmp_rules[i].base = atoi(rule);
1281                         *(rule - 1) = ':';
1282                         rule = strtok(NULL, ":");
1283                 }
1284                 nsnmp_rules = i;
1285         }
1286         if (parms[tflag].count)
1287                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1288         if (parms[aflag].count) {
1289                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1290                 bad_lhost:
1291                         fprintf(stderr, "Illegal %s\n", "source address");
1292                         exit(1);
1293                 } else {
1294                         saddr = *((struct sockaddr_in *) res->ai_addr);
1295                         freeaddrinfo(res);
1296                 }
1297         }
1298         if (parms[uflag].count) 
1299                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1300                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1301                         exit(1);
1302                 }
1303
1304
1305         /* Process collectors parameters. Brrrr... :-[ */
1306
1307         npeers = argc - optind;
1308         if (npeers > 1) {
1309                 /* Send to remote Netflow collector */
1310                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1311                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1312                         dhost = argv[i];
1313                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1314                         *dport++ = 0;
1315                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1316                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1317                                 exit(1);
1318                         }
1319                         peers[npeers].write_fd = write_fd;
1320                         peers[npeers].type = PEER_MIRROR;
1321                         peers[npeers].laddr = saddr;
1322                         peers[npeers].seq = 0;
1323                         if ((lhost = strchr(dport, '/'))) {
1324                                 *lhost++ = 0;
1325                                 if ((type = strchr(lhost, '/'))) {
1326                                         *type++ = 0;
1327                                         switch (*type) {
1328                                                 case 0:
1329                                                 case 'm':
1330                                                         break;
1331
1332                                                 case 'r':
1333                                                         peers[npeers].type = PEER_ROTATE;
1334                                                         npeers_rot++;
1335                                                         break;
1336
1337                                                 default:
1338                                                         goto bad_collector;
1339                                         }
1340                                 }
1341                                 if (*lhost) {
1342                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1343                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1344                                         freeaddrinfo(res);
1345                                 }
1346                         }
1347                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1348                                                 sizeof(struct sockaddr_in))) {
1349                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1350                                 exit(1);
1351                         }
1352                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1353 bad_collector:
1354                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1355                                 exit(1);
1356                         }
1357                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1358                         freeaddrinfo(res);
1359                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1360                                                 sizeof(struct sockaddr_in))) {
1361                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1362                                 exit(1);
1363                         }
1364
1365                         /* Restore command line */
1366                         if (type) *--type = '/';
1367                         if (lhost) *--lhost = '/';
1368                         *--dport = ':';
1369                 }
1370         }
1371         else if (parms[fflag].count) {
1372                 // log into a file
1373                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1374                 if (!(peers[0].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1375                 strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
1376                 
1377                 peers[0].write_fd = -1;
1378                 peers[0].type = PEER_FILE;
1379                 peers[0].seq = 0;
1380                 npeers++;
1381         }
1382         else 
1383                 usage();
1384
1385
1386         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1387         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1388         if (!ulog_handle) {
1389                 fprintf(stderr, "libipulog initialization error: %s",
1390                         ipulog_strerror(ipulog_errno));
1391                 exit(1);
1392         }
1393         if (sockbufsize)
1394                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1395                         &sockbufsize, sizeof(sockbufsize)) < 0)
1396                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1397
1398         /* Daemonize (if log destination stdout-free) */
1399
1400         my_log_open(ident, verbosity, log_dest);
1401         if (!(log_dest & 2)) {
1402                 switch (fork()) {
1403                         case -1:
1404                                 fprintf(stderr, "fork(): %s", strerror(errno));
1405                                 exit(1);
1406
1407                         case 0:
1408                                 setsid();
1409                                 freopen("/dev/null", "r", stdin);
1410                                 freopen("/dev/null", "w", stdout);
1411                                 freopen("/dev/null", "w", stderr);
1412                                 break;
1413
1414                         default:
1415                                 exit(0);
1416                 }
1417         } else {
1418                 setvbuf(stdout, (char *)0, _IONBF, 0);
1419                 setvbuf(stderr, (char *)0, _IONBF, 0);
1420         }
1421
1422         pid = getpid();
1423         sprintf(errpbuf, "[%ld]", (long) pid);
1424         strcat(ident, errpbuf);
1425
1426         /* Initialization */
1427
1428         hash_init(); /* Actually for crc16 only */
1429         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1430         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1431
1432 #ifdef UPTIME_TRICK
1433         /* Hope 12 days is enough :-/ */
1434         start_time_offset = 1 << 20;
1435
1436         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1437 #endif
1438         gettime(&start_time);
1439
1440         /*
1441         Build static pending queue as circular buffer.
1442         */
1443         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1444         pending_tail = pending_head;
1445         for (i = pending_queue_length - 1; i--;) {
1446                 if (!(pending_tail->next = mem_alloc())) {
1447                 err_mem_alloc:
1448                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1449                         exit(1);
1450                 }
1451                 pending_tail = pending_tail->next;
1452         }
1453         pending_tail->next = pending_head;
1454         pending_tail = pending_head;
1455
1456         sigemptyset(&sig_mask);
1457         sigact.sa_handler = &sighandler;
1458         sigact.sa_mask = sig_mask;
1459         sigact.sa_flags = 0;
1460         sigaddset(&sig_mask, SIGTERM);
1461         sigaction(SIGTERM, &sigact, 0);
1462 #if ((DEBUG) & DEBUG_I)
1463         sigaddset(&sig_mask, SIGUSR1);
1464         sigaction(SIGUSR1, &sigact, 0);
1465 #endif
1466         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1467                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1468                 exit(1);
1469         }
1470
1471         my_log(LOG_INFO, "Starting %s...", VERSION);
1472
1473         if (parms[cflag].count) {
1474                 if (chdir(parms[cflag].arg) || chroot(".")) {
1475                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1476                         exit(1);
1477                 }
1478         }
1479
1480         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1481         pthread_attr_init(&tattr);
1482         for (i = 0; i < THREADS - 1; i++) {
1483                 if (schedp.sched_priority > 0) {
1484                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1485                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1486                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1487                                 exit(1);
1488                         }
1489                 }
1490                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1491                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1492                         exit(1);
1493                 }
1494                 pthread_detach(thid);
1495                 schedp.sched_priority++;
1496         }
1497
1498         if (pw) {
1499                 if (setgroups(0, NULL)) {
1500                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1501                         exit(1);
1502                 }
1503                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1504                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1505                         exit(1);
1506                 }
1507                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1508                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1509                         exit(1);
1510                 }
1511         }
1512
1513         if (!(pidfile = fopen(pidfilepath, "w")))
1514                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1515         else {
1516                 fprintf(pidfile, "%ld\n", (long) pid);
1517                 fclose(pidfile);
1518         }
1519
1520         my_log(LOG_INFO, "pid: %d", pid);
1521         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1522                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1523                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1524                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1525                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1526                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1527                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1528         for (i = 0; i < nsnmp_rules; i++) {
1529                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1530                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1531         }
1532         for (i = 0; i < npeers; i++) {
1533                 switch (peers[i].type) {
1534                         case PEER_MIRROR:
1535                                 c = 'm';
1536                                 break;
1537                         case PEER_ROTATE:
1538                                 c = 'r';
1539                                 break;
1540                 }
1541                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1542                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1543                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1544         }
1545
1546         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1547
1548         timeout.tv_usec = 0;
1549         while (!killed
1550                 || (total_elements - free_elements - pending_queue_length)
1551                 || emit_count
1552                 || pending_tail->flags) {
1553
1554                 if (!sigs) {
1555                         timeout.tv_sec = scan_interval;
1556                         select(0, 0, 0, 0, &timeout);
1557                 }
1558
1559                 if (sigs & SIGTERM_MASK && !killed) {
1560                         sigs &= ~SIGTERM_MASK;
1561                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1562                         scan_interval = 1;
1563                         frag_lifetime = -1;
1564                         active_lifetime = -1;
1565                         inactive_lifetime = -1;
1566                         emit_timeout = 1;
1567                         unpending_timeout = 1;
1568                         killed = 1;
1569                         pthread_cond_signal(&scan_cond);
1570                         pthread_cond_signal(&unpending_cond);
1571                 }
1572
1573 #if ((DEBUG) & DEBUG_I)
1574                 if (sigs & SIGUSR1_MASK) {
1575                         sigs &= ~SIGUSR1_MASK;
1576                         info_debug();
1577                 }
1578 #endif
1579         }
1580         remove(pidfilepath);
1581 #if ((DEBUG) & DEBUG_I)
1582         info_debug();
1583 #endif
1584         my_log(LOG_INFO, "Done.");
1585 #ifdef WALL
1586         return 0;
1587 #endif
1588 }