Changes:
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9         7/11/2007       Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11                 Added data collection (-f) functionality, xid support in the header and log file
12                 rotation.
13 */
14
15 #include <common.h>
16
17 /* stdout, stderr, freopen() */
18 #include <stdio.h>
19
20 /* atoi(), exit() */
21 #include <stdlib.h>
22
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
24 #include <unistd.h>
25
26 /* strerror() */
27 #include <string.h>
28
29 /* sig*() */
30 #include <signal.h>
31
32 /* statfs() */
33
34 #include <sys/statfs.h>
35
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
38         int fd;
39         u_int8_t blocking;
40         struct sockaddr_nl local;
41         struct sockaddr_nl peer;
42         struct nlmsghdr* last_nlhdr;
43 };
44
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
55 #include <net/if.h>
56
57 #include <sys/param.h>
58 #include <pwd.h>
59 #ifdef OS_LINUX
60 #include <grp.h>
61 #endif
62
63 /* pthread_*() */
64 #include <pthread.h>
65
66 /* errno */
67 #include <errno.h>
68
69 /* getaddrinfo() */
70 #include <netdb.h>
71
72 /* nanosleep() */
73 #include <time.h>
74
75 /* gettimeofday() */
76 #include <sys/time.h>
77
78 /* scheduling */
79 #include <sched.h>
80
81 /* select() (POSIX)*/
82 #include <sys/select.h>
83
84 /* open() */
85 #include <sys/stat.h>
86 #include <fcntl.h>
87
88 #include <fprobe-ulog.h>
89 #include <my_log.h>
90 #include <my_getopt.h>
91 #include <netflow.h>
92 #include <hash.h>
93 #include <mem.h>
94
95 enum {
96         aflag,
97         Bflag,
98         bflag,
99         cflag,
100         dflag,
101         Dflag,
102         eflag,
103         Eflag,
104         fflag,
105         gflag,
106         hflag,
107         lflag,
108         mflag,
109         Mflag,
110         nflag,
111         qflag,
112         rflag,
113         sflag,
114         tflag,
115         Tflag,
116         Uflag,
117         uflag,
118         vflag,
119         Wflag,
120         Xflag,
121 };
122
123 static struct getopt_parms parms[] = {
124         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'h', 0, 0, 0},
134         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'M', 0, 0, 0},
137         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {0, 0, 0, 0}
148 };
149
150 extern char *optarg;
151 extern int optind, opterr, optopt;
152 extern int errno;
153
154 extern struct NetFlow NetFlow1;
155 extern struct NetFlow NetFlow5;
156 extern struct NetFlow NetFlow7;
157
158 #define START_VALUE -5
159 #define mark_is_tos parms[Mflag].count
160 static unsigned scan_interval = 5;
161 static int min_free = 0;
162 static int frag_lifetime = 30;
163 static int inactive_lifetime = 60;
164 static int active_lifetime = 300;
165 static int sockbufsize;
166 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
167 #if (MEM_BITS == 0) || (MEM_BITS == 16)
168 #define BULK_QUANTITY 10000
169 #else
170 #define BULK_QUANTITY 200
171 #endif
172
173 static unsigned epoch_length=60, log_epochs=1; 
174 static unsigned cur_epoch=0,prev_uptime=0;
175
176 static unsigned bulk_quantity = BULK_QUANTITY;
177 static unsigned pending_queue_length = 100;
178 static struct NetFlow *netflow = &NetFlow5;
179 static unsigned verbosity = 6;
180 static unsigned log_dest = MY_LOG_SYSLOG;
181 static struct Time start_time;
182 static long start_time_offset;
183 static int off_tl;
184 /* From mem.c */
185 extern unsigned total_elements;
186 extern unsigned free_elements;
187 extern unsigned total_memory;
188 #if ((DEBUG) & DEBUG_I)
189 static unsigned emit_pkts, emit_queue;
190 static uint64_t size_total;
191 static unsigned pkts_total, pkts_total_fragmented;
192 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
193 static unsigned pkts_pending, pkts_pending_done;
194 static unsigned pending_queue_trace, pending_queue_trace_candidate;
195 static unsigned flows_total, flows_fragmented;
196 #endif
197 static unsigned emit_count;
198 static uint32_t emit_sequence;
199 static unsigned emit_rate_bytes, emit_rate_delay;
200 static struct Time emit_time;
201 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
202 static pthread_t thid;
203 static sigset_t sig_mask;
204 static struct sched_param schedp;
205 static int sched_min, sched_max;
206 static int npeers, npeers_rot;
207 static struct peer *peers;
208 static int sigs;
209
210 static struct Flow *flows[1 << HASH_BITS];
211 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
212
213 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
215
216 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
217 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
218 static struct Flow *pending_head, *pending_tail;
219 static struct Flow *scan_frag_dreg;
220
221 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
222 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
223 static struct Flow *flows_emit;
224
225 static char ident[256] = "fprobe-ulog";
226 static FILE *pidfile;
227 static char *pidfilepath;
228 static pid_t pid;
229 static int killed;
230 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
231 static struct ipulog_handle *ulog_handle;
232 static uint32_t ulog_gmask = 1;
233 static char *cap_buf;
234 static int nsnmp_rules;
235 static struct snmp_rule *snmp_rules;
236 static struct passwd *pw = 0;
237
238 void usage()
239 {
240         fprintf(stdout,
241                 "fprobe-ulog: a NetFlow probe. Version %s\n"
242                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
243                 "\n"
244                 "-h\t\tDisplay this help\n"
245                 "-U <mask>\tULOG group bitwise mask [1]\n"
246                 "-s <seconds>\tHow often scan for expired flows [5]\n"
247                 "-g <seconds>\tFragmented flow lifetime [30]\n"
248                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
249                 "-f <filename>\tLog flow data in a file\n"
250                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
251                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
252                 "-a <address>\tUse <address> as source for NetFlow flow\n"
253                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
254                 "-M\t\tUse netfilter mark value as ToS flag\n"
255                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
256                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
257                 "-q <flows>\tPending queue length [100]\n"
258                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
259                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
260                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
261                 "-c <directory>\tDirectory to chroot to\n"
262                 "-u <user>\tUser to run as\n"
263                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
264                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
265                 "-y <remote:port>\tAddress of the NetFlow collector\n"
266                 "-f <writable file>\tFile to write data into\n"
267                 "-T <n>\tRotate log file every n epochs\n"
268                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
269                 "-E <[1..60]>\tSize of an epoch in minutes\n"
270                 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
271                 ,
272                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
273         exit(0);
274 }
275
276 #if ((DEBUG) & DEBUG_I)
277 void info_debug()
278 {
279         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
280                 pkts_total, pkts_total_fragmented, size_total,
281                 pkts_pending - pkts_pending_done, pending_queue_trace);
282         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
283                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
284         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
285                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
286         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
287                 total_elements, free_elements, total_memory);
288 }
289 #endif
290
291 void sighandler(int sig)
292 {
293         switch (sig) {
294                 case SIGTERM:
295                         sigs |= SIGTERM_MASK;
296                         break;
297 #if ((DEBUG) & DEBUG_I)
298                 case SIGUSR1:
299                         sigs |= SIGUSR1_MASK;
300                         break;
301 #endif
302         }
303 }
304
305 void gettime(struct Time *now)
306 {
307         struct timeval t;
308
309         gettimeofday(&t, 0);
310         now->sec = t.tv_sec;
311         now->usec = t.tv_usec;
312 }
313
314
315 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
316 {
317         return (t1->sec - t2->sec)/60;
318 }
319
320 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
321 {
322         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
323 }
324
325 /* Uptime in miliseconds */
326 uint32_t getuptime(struct Time *t)
327 {
328         /* Maximum uptime is about 49/2 days */
329         return cmpmtime(t, &start_time);
330 }
331
332 /* Uptime in minutes */
333 uint32_t getuptime_minutes(struct Time *t)
334 {
335         /* Maximum uptime is about 49/2 days */
336         return cmpMtime(t, &start_time);
337 }
338
339 hash_t hash_flow(struct Flow *flow)
340 {
341         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
342         else return hash(flow, sizeof(struct Flow_TL));
343 }
344
345 uint16_t snmp_index(char *name) {
346         uint32_t i;
347
348         if (!*name) return 0;
349
350         for (i = 0; (int) i < nsnmp_rules; i++) {
351                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
352                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
353         }
354
355         if ((i = if_nametoindex(name))) return i;
356
357         return -1;
358 }
359
360 inline void copy_flow(struct Flow *src, struct Flow *dst)
361 {
362         dst->iif = src->iif;
363         dst->oif = src->oif;
364         dst->sip = src->sip;
365         dst->dip = src->dip;
366         dst->tos = src->tos;
367         dst->proto = src->proto;
368         dst->tcp_flags = src->tcp_flags;
369         dst->id = src->id;
370         dst->sp = src->sp;
371         dst->dp = src->dp;
372         dst->pkts = src->pkts;
373         dst->size = src->size;
374         dst->sizeF = src->sizeF;
375         dst->sizeP = src->sizeP;
376         dst->ctime = src->ctime;
377         dst->mtime = src->mtime;
378         dst->flags = src->flags;
379 }
380
381 void update_cur_epoch_file(int n) {
382         int fd, len;
383         char snum[7];
384         len=snprintf(snum,6,"%d",n);
385         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
386         if (fd == -1) {
387                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
388                 return;
389         }
390         write(fd, snum, len);
391         close(fd);
392 }
393
394 unsigned get_log_fd(char *fname, unsigned cur_fd) {
395         struct Time now;
396         unsigned cur_uptime;
397         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
398          * doesn't solve the problem */
399
400         struct statfs statfs;
401         int ret_fd;
402         gettime(&now);
403         cur_uptime = getuptime_minutes(&now);
404
405         if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
406                 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
407         }
408         else {
409                 if (min_free && statfs.f_bfree < min_free) 
410                         switch(cur_epoch) {
411                                 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
412                                         my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
413                                         exit(1);
414                                 default:
415                                         my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
416                                         cur_epoch = -1;
417                         }
418         }
419
420         /* Epoch length in minutes */
421         if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
422                 char nextname[MAX_PATH_LEN];
423                 int write_fd;
424                 prev_uptime = cur_uptime;
425                 cur_epoch = (cur_epoch + 1) % log_epochs;
426                 close(cur_fd);
427                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
428                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
429                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
430                         exit(1);
431                 }
432                 update_cur_epoch_file(cur_epoch);
433                 ret_fd = write_fd;
434         }
435         else
436                 ret_fd = cur_fd;
437         return(ret_fd);
438 }
439
440 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
441 {
442         struct Flow **flowpp;
443
444 #ifdef WALL
445         flowpp = 0;
446 #endif
447
448         if (prev) flowpp = *prev;
449
450         while (where) {
451                 if (where->sip.s_addr == what->sip.s_addr
452                         && where->dip.s_addr == what->dip.s_addr
453                         && where->proto == what->proto) {
454                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
455                                 case 0:
456                                         /* Both unfragmented */
457                                         if ((what->sp == where->sp)
458                                                 && (what->dp == where->dp)) goto done;
459                                         break;
460                                 case 2:
461                                         /* Both fragmented */
462                                         if (where->id == what->id) goto done;
463                                         break;
464                         }
465                 }
466                 flowpp = &where->next;
467                 where = where->next;
468         }
469 done:
470         if (prev) *prev = flowpp;
471         return where;
472 }
473
474 int put_into(struct Flow *flow, int flag
475 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
476         , char *logbuf
477 #endif
478 )
479 {
480         int ret = 0;
481         hash_t h;
482         struct Flow *flown, **flowpp;
483 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
484         char buf[64];
485 #endif
486
487         h = hash_flow(flow);
488 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
489         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
490         strcat(logbuf, buf);
491 #endif
492         pthread_mutex_lock(&flows_mutex[h]);
493         flowpp = &flows[h];
494         if (!(flown = find(flows[h], flow, &flowpp))) {
495                 /* No suitable flow found - add */
496                 if (flag == COPY_INTO) {
497                         if ((flown = mem_alloc())) {
498                                 copy_flow(flow, flown);
499                                 flow = flown;
500                         } else {
501 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
502                                 my_log(LOG_ERR, "%s %s. %s",
503                                         "mem_alloc():", strerror(errno), "packet lost");
504 #endif
505                                 return -1;
506                         }
507                 }
508                 flow->next = flows[h];
509                 flows[h] = flow;
510 #if ((DEBUG) & DEBUG_I)
511                 flows_total++;
512                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
513 #endif
514 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
515                 if (flown) {
516                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
517                         strcat(logbuf, buf);
518                 }
519 #endif
520         } else {
521                 /* Found suitable flow - update */
522 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
523                 sprintf(buf, " +> %x", (unsigned) flown);
524                 strcat(logbuf, buf);
525 #endif
526                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
527                         flown->mtime = flow->mtime;
528                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
529                         flown->ctime = flow->ctime;
530                 flown->tcp_flags |= flow->tcp_flags;
531                 flown->size += flow->size;
532                 flown->pkts += flow->pkts;
533                 if (flow->flags & FLOW_FRAG) {
534                         /* Fragmented flow require some additional work */
535                         if (flow->flags & FLOW_TL) {
536                                 /*
537                                 ?FIXME?
538                                 Several packets with FLOW_TL (attack)
539                                 */
540                                 flown->sp = flow->sp;
541                                 flown->dp = flow->dp;
542                         }
543                         if (flow->flags & FLOW_LASTFRAG) {
544                                 /*
545                                 ?FIXME?
546                                 Several packets with FLOW_LASTFRAG (attack)
547                                 */
548                                 flown->sizeP = flow->sizeP;
549                         }
550                         flown->flags |= flow->flags;
551                         flown->sizeF += flow->sizeF;
552                         if ((flown->flags & FLOW_LASTFRAG)
553                                 && (flown->sizeF >= flown->sizeP)) {
554                                 /* All fragments received - flow reassembled */
555                                 *flowpp = flown->next;
556                                 pthread_mutex_unlock(&flows_mutex[h]);
557 #if ((DEBUG) & DEBUG_I)
558                                 flows_total--;
559                                 flows_fragmented--;
560 #endif
561                                 flown->id = 0;
562                                 flown->flags &= ~FLOW_FRAG;
563 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
564                                 strcat(logbuf," R");
565 #endif
566                                 ret = put_into(flown, MOVE_INTO
567 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
568                                                 , logbuf
569 #endif
570                                         );
571                         }
572                 }
573                 if (flag == MOVE_INTO) mem_free(flow);
574         }
575         pthread_mutex_unlock(&flows_mutex[h]);
576         return ret;
577 }
578
579 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
580 {
581         int i;
582
583         for (i = 0; i < fields; i++) {
584 #if ((DEBUG) & DEBUG_F)
585                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
586 #endif
587                 switch (format[i]) {
588                         case NETFLOW_IPV4_SRC_ADDR:
589                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
590                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
591                                 break;
592
593                         case NETFLOW_IPV4_DST_ADDR:
594                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
595                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
596                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
597                                 }
598                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
599                                 break;
600
601                         case NETFLOW_INPUT_SNMP:
602                                 *((uint16_t *) p) = htons(flow->iif);
603                                 p += NETFLOW_INPUT_SNMP_SIZE;
604                                 break;
605
606                         case NETFLOW_OUTPUT_SNMP:
607                                 *((uint16_t *) p) = htons(flow->oif);
608                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
609                                 break;
610
611                         case NETFLOW_PKTS_32:
612                                 *((uint32_t *) p) = htonl(flow->pkts);
613                                 p += NETFLOW_PKTS_32_SIZE;
614                                 break;
615
616                         case NETFLOW_BYTES_32:
617                                 *((uint32_t *) p) = htonl(flow->size);
618                                 p += NETFLOW_BYTES_32_SIZE;
619                                 break;
620
621                         case NETFLOW_FIRST_SWITCHED:
622                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
623                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
624                                 break;
625
626                         case NETFLOW_LAST_SWITCHED:
627                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
628                                 p += NETFLOW_LAST_SWITCHED_SIZE;
629                                 break;
630
631                         case NETFLOW_L4_SRC_PORT:
632                                 *((uint16_t *) p) = flow->sp;
633                                 p += NETFLOW_L4_SRC_PORT_SIZE;
634                                 break;
635
636                         case NETFLOW_L4_DST_PORT:
637                                 *((uint16_t *) p) = flow->dp;
638                                 p += NETFLOW_L4_DST_PORT_SIZE;
639                                 break;
640
641                         case NETFLOW_PROT:
642                                 *((uint8_t *) p) = flow->proto;
643                                 p += NETFLOW_PROT_SIZE;
644                                 break;
645
646                         case NETFLOW_SRC_TOS:
647                                 *((uint8_t *) p) = flow->tos;
648                                 p += NETFLOW_SRC_TOS_SIZE;
649                                 break;
650
651                         case NETFLOW_TCP_FLAGS:
652                                 *((uint8_t *) p) = flow->tcp_flags;
653                                 p += NETFLOW_TCP_FLAGS_SIZE;
654                                 break;
655
656                         case NETFLOW_VERSION:
657                                 *((uint16_t *) p) = htons(netflow->Version);
658                                 p += NETFLOW_VERSION_SIZE;
659                                 break;
660
661                         case NETFLOW_COUNT:
662                                 *((uint16_t *) p) = htons(emit_count);
663                                 p += NETFLOW_COUNT_SIZE;
664                                 break;
665
666                         case NETFLOW_UPTIME:
667                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
668                                 p += NETFLOW_UPTIME_SIZE;
669                                 break;
670
671                         case NETFLOW_UNIX_SECS:
672                                 *((uint32_t *) p) = htonl(emit_time.sec);
673                                 p += NETFLOW_UNIX_SECS_SIZE;
674                                 break;
675
676                         case NETFLOW_UNIX_NSECS:
677                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
678                                 p += NETFLOW_UNIX_NSECS_SIZE;
679                                 break;
680
681                         case NETFLOW_FLOW_SEQUENCE:
682                                 //*((uint32_t *) p) = htonl(emit_sequence);
683                                 *((uint32_t *) p) = 0;
684                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
685                                 break;
686
687                         case NETFLOW_PAD8:
688                         /* Unsupported (uint8_t) */
689                         case NETFLOW_ENGINE_TYPE:
690                         case NETFLOW_ENGINE_ID:
691                         case NETFLOW_FLAGS7_1:
692                         case NETFLOW_SRC_MASK:
693                         case NETFLOW_DST_MASK:
694                                 *((uint8_t *) p) = 0;
695                                 p += NETFLOW_PAD8_SIZE;
696                                 break;
697                         case NETFLOW_XID:
698                                 *((uint16_t *) p) = flow->tos;
699                                 p += NETFLOW_XID_SIZE;
700                                 break;
701                         case NETFLOW_PAD16:
702                         /* Unsupported (uint16_t) */
703                         case NETFLOW_SRC_AS:
704                         case NETFLOW_DST_AS:
705                         case NETFLOW_FLAGS7_2:
706                                 *((uint16_t *) p) = 0;
707                                 p += NETFLOW_PAD16_SIZE;
708                                 break;
709
710                         case NETFLOW_PAD32:
711                         /* Unsupported (uint32_t) */
712                         case NETFLOW_IPV4_NEXT_HOP:
713                         case NETFLOW_ROUTER_SC:
714                                 *((uint32_t *) p) = 0;
715                                 p += NETFLOW_PAD32_SIZE;
716                                 break;
717
718                         default:
719                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
720                                         format, i, format[i]);
721                                 exit(1);
722                 }
723         }
724 #if ((DEBUG) & DEBUG_F)
725         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
726 #endif
727         return p;
728 }
729
730 void setuser() {
731         /*
732         Workaround for clone()-based threads
733         Try to change EUID independently of main thread
734         */
735         if (pw) {
736                 setgroups(0, NULL);
737                 setregid(pw->pw_gid, pw->pw_gid);
738                 setreuid(pw->pw_uid, pw->pw_uid);
739         }
740 }
741
742 void *emit_thread()
743 {
744         struct Flow *flow;
745         void *p;
746         struct timeval now;
747         struct timespec timeout;
748         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
749
750         p = (void *) &emit_packet + netflow->HeaderSize;
751         timeout.tv_nsec = 0;
752
753         setuser();
754
755         for (;;) {
756                 pthread_mutex_lock(&emit_mutex);
757                 while (!flows_emit) {
758                         gettimeofday(&now, 0);
759                         timeout.tv_sec = now.tv_sec + emit_timeout;
760                         /* Do not wait until emit_packet will filled - it may be too long */
761                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
762                                 pthread_mutex_unlock(&emit_mutex);
763                                 goto sendit;
764                         }
765                 }
766                 flow = flows_emit;
767                 flows_emit = flows_emit->next;
768 #if ((DEBUG) & DEBUG_I)
769                 emit_queue--;
770 #endif          
771                 pthread_mutex_unlock(&emit_mutex);
772
773 #ifdef UPTIME_TRICK
774                 if (!emit_count) {
775                         gettime(&start_time);
776                         start_time.sec -= start_time_offset;
777                 }
778 #endif
779                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
780                 mem_free(flow);
781                 emit_count++;
782 #ifdef PF2_DEBUG
783                 printf("Emit count = %d\n", emit_count);
784                 fflush(stdout);
785 #endif
786                 if (emit_count == netflow->MaxFlows) {
787                 sendit:
788                         gettime(&emit_time);
789                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
790                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
791                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
792                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
793                         peer_rot_cur = 0;
794                         for (i = 0; i < npeers; i++) {
795                                 if (peers[i].type == PEER_FILE) {
796                                                 if (netflow->SeqOffset)
797                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
798                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
799                                                 ret = write(peers[i].write_fd, emit_packet, size);
800                                                 if (ret < size) {
801
802 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
803                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
804                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
805 #endif
806 #undef MESSAGES
807                                                 }
808 #if ((DEBUG) & DEBUG_E)
809                                                 commaneelse {
810                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
811                                                                 emit_count, i + 1, peers[i].seq);
812                                                 }
813 #endif
814                                                 peers[i].seq += emit_count;
815
816                                                 /* Rate limit */
817                                                 if (emit_rate_bytes) {
818                                                         sent += size;
819                                                         delay = sent / emit_rate_bytes;
820                                                         if (delay) {
821                                                                 sent %= emit_rate_bytes;
822                                                                 timeout.tv_sec = 0;
823                                                                 timeout.tv_nsec = emit_rate_delay * delay;
824                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
825                                                         }
826                                                 }
827                                         }
828                                 else
829                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
830                                 else
831                                 if (peers[i].type == PEER_ROTATE) 
832                                         if (peer_rot_cur++ == peer_rot_work) {
833                                         sendreal:
834                                                 if (netflow->SeqOffset)
835                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
836                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
837                                                 if (ret < size) {
838 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
839                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
840                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
841 #endif
842                                                 }
843 #if ((DEBUG) & DEBUG_E)
844                                                 commaneelse {
845                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
846                                                                 emit_count, i + 1, peers[i].seq);
847                                                 }
848 #endif
849                                                 peers[i].seq += emit_count;
850
851                                                 /* Rate limit */
852                                                 if (emit_rate_bytes) {
853                                                         sent += size;
854                                                         delay = sent / emit_rate_bytes;
855                                                         if (delay) {
856                                                                 sent %= emit_rate_bytes;
857                                                                 timeout.tv_sec = 0;
858                                                                 timeout.tv_nsec = emit_rate_delay * delay;
859                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
860                                                         }
861                                                 }
862                                         }
863                         }
864                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
865                         emit_sequence += emit_count;
866                         emit_count = 0;
867 #if ((DEBUG) & DEBUG_I)
868                         emit_pkts++;
869 #endif
870                 }
871         }
872 }       
873
874 void *unpending_thread()
875 {
876         struct timeval now;
877         struct timespec timeout;
878 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
879         char logbuf[256];
880 #endif
881
882         setuser();
883
884         timeout.tv_nsec = 0;
885         pthread_mutex_lock(&unpending_mutex);
886
887         for (;;) {
888                 while (!(pending_tail->flags & FLOW_PENDING)) {
889                         gettimeofday(&now, 0);
890                         timeout.tv_sec = now.tv_sec + unpending_timeout;
891                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
892                 }
893
894 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
895                 *logbuf = 0;
896 #endif
897                 if (put_into(pending_tail, COPY_INTO
898 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
899                                 , logbuf
900 #endif
901                         ) < 0) {
902 #if ((DEBUG) & DEBUG_I)
903                         pkts_lost_unpending++;
904 #endif                          
905                 }
906
907 #if ((DEBUG) & DEBUG_U)
908                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
909 #endif
910
911                 pending_tail->flags = 0;
912                 pending_tail = pending_tail->next;
913 #if ((DEBUG) & DEBUG_I)
914                 pkts_pending_done++;
915 #endif
916         }
917 }
918
919 void *scan_thread()
920 {
921 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
922         char logbuf[256];
923 #endif
924         int i;
925         struct Flow *flow, **flowpp;
926         struct Time now;
927         struct timespec timeout;
928
929         setuser();
930
931         timeout.tv_nsec = 0;
932         pthread_mutex_lock(&scan_mutex);
933
934         for (;;) {
935                 gettime(&now);
936                 timeout.tv_sec = now.sec + scan_interval;
937                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
938
939                 gettime(&now);
940 #if ((DEBUG) & DEBUG_S)
941                 my_log(LOG_DEBUG, "S: %d", now.sec);
942 #endif
943                 for (i = 0; i < 1 << HASH_BITS ; i++) {
944                         pthread_mutex_lock(&flows_mutex[i]);
945                         flow = flows[i];
946                         flowpp = &flows[i];
947                         while (flow) {
948                                 if (flow->flags & FLOW_FRAG) {
949                                         /* Process fragmented flow */
950                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
951                                                 /* Fragmented flow expired - put it into special chain */
952 #if ((DEBUG) & DEBUG_I)
953                                                 flows_fragmented--;
954                                                 flows_total--;
955 #endif
956                                                 *flowpp = flow->next;
957                                                 flow->id = 0;
958                                                 flow->flags &= ~FLOW_FRAG;
959                                                 flow->next = scan_frag_dreg;
960                                                 scan_frag_dreg = flow;
961                                                 flow = *flowpp;
962                                                 continue;
963                                         }
964                                 } else {
965                                         /* Flow is not frgamented */
966                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
967                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
968                                                 /* Flow expired */
969 #if ((DEBUG) & DEBUG_S)
970                                                 my_log(LOG_DEBUG, "S: E %x", flow);
971 #endif
972 #if ((DEBUG) & DEBUG_I)
973                                                 flows_total--;
974 #endif
975                                                 *flowpp = flow->next;
976                                                 pthread_mutex_lock(&emit_mutex);
977                                                 flow->next = flows_emit;
978                                                 flows_emit = flow;
979 #if ((DEBUG) & DEBUG_I)
980                                                 emit_queue++;
981 #endif                          
982                                                 pthread_mutex_unlock(&emit_mutex);
983                                                 flow = *flowpp;
984                                                 continue;
985                                         }
986                                 }
987                                 flowpp = &flow->next;
988                                 flow = flow->next;
989                         } /* chain loop */
990                         pthread_mutex_unlock(&flows_mutex[i]);
991                 } /* hash loop */
992                 if (flows_emit) pthread_cond_signal(&emit_cond);
993
994                 while (scan_frag_dreg) {
995                         flow = scan_frag_dreg;
996                         scan_frag_dreg = flow->next;
997 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
998                         *logbuf = 0;
999 #endif
1000                         put_into(flow, MOVE_INTO
1001 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1002                                 , logbuf
1003 #endif
1004                         );
1005 #if ((DEBUG) & DEBUG_S)
1006                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1007 #endif
1008                 }
1009         }
1010 }
1011
1012 void *cap_thread()
1013 {
1014         struct ulog_packet_msg *ulog_msg;
1015         struct ip *nl;
1016         void *tl;
1017         struct Flow *flow;
1018         int len, off_frag, psize;
1019 #if ((DEBUG) & DEBUG_C)
1020         char buf[64];
1021         char logbuf[256];
1022 #endif
1023
1024         setuser();
1025
1026         while (!killed) {
1027                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1028                 if (len <= 0) {
1029                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1030                         continue;
1031                 }
1032                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1033
1034 #if ((DEBUG) & DEBUG_C)
1035                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1036 #endif
1037
1038                         nl = (void *) &ulog_msg->payload;
1039                         psize = ulog_msg->data_len;
1040
1041                         /* Sanity check */
1042                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1043 #if ((DEBUG) & DEBUG_C)
1044                                 strcat(logbuf, " U");
1045                                 my_log(LOG_DEBUG, "%s", logbuf);
1046 #endif
1047 #if ((DEBUG) & DEBUG_I)
1048                                 pkts_ignored++;
1049 #endif
1050                                 continue;
1051                         }
1052
1053                         if (pending_head->flags) {
1054 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1055                                 my_log(LOG_ERR,
1056 # if ((DEBUG) & DEBUG_C)
1057                                         "%s %s %s", logbuf,
1058 # else
1059                                         "%s %s",
1060 # endif
1061                                         "pending queue full:", "packet lost");
1062 #endif
1063 #if ((DEBUG) & DEBUG_I)
1064                                 pkts_lost_capture++;
1065 #endif
1066                                 goto done;
1067                         }
1068
1069 #if ((DEBUG) & DEBUG_I)
1070                         pkts_total++;
1071 #endif
1072
1073                         flow = pending_head;
1074
1075                         /* ?FIXME? Add sanity check for ip_len? */
1076                         flow->size = ntohs(nl->ip_len);
1077 #if ((DEBUG) & DEBUG_I)
1078                         size_total += flow->size;
1079 #endif
1080
1081                         flow->sip = nl->ip_src;
1082                         flow->dip = nl->ip_dst;
1083                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1084                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1085                         }
1086                         flow->iif = snmp_index(ulog_msg->indev_name);
1087                         flow->oif = snmp_index(ulog_msg->outdev_name);
1088                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1089                         flow->proto = nl->ip_p;
1090                         flow->id = 0;
1091                         flow->tcp_flags = 0;
1092                         flow->pkts = 1;
1093                         flow->sizeF = 0;
1094                         flow->sizeP = 0;
1095                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1096                         if (ulog_msg->timestamp_sec) {
1097                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1098                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1099                         } else gettime(&flow->ctime);
1100                         flow->mtime = flow->ctime;
1101
1102                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1103
1104                         /*
1105                         Offset (from network layer) to transport layer header/IP data
1106                         IOW IP header size ;-)
1107
1108                         ?FIXME?
1109                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1110                         */
1111                         off_tl = nl->ip_hl << 2;
1112                         tl = (void *) nl + off_tl;
1113
1114                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1115                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1116                         psize -= off_tl;
1117                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1118                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1119
1120                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1121                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1122 #if ((DEBUG) & DEBUG_C)
1123                                 strcat(logbuf, " F");
1124 #endif
1125 #if ((DEBUG) & DEBUG_I)
1126                                 pkts_total_fragmented++;
1127 #endif
1128                                 flow->flags |= FLOW_FRAG;
1129                                 flow->id = nl->ip_id;
1130
1131                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1132                                         /* Packet whith IP_MF contains information about whole datagram size */
1133                                         flow->flags |= FLOW_LASTFRAG;
1134                                         /* size = frag_offset*8 + data_size */
1135                                         flow->sizeP = off_frag + flow->sizeF;
1136                                 }
1137                         }
1138
1139 #if ((DEBUG) & DEBUG_C)
1140                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1141                         strcat(logbuf, buf);
1142                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1143                         strcat(logbuf, buf);
1144 #endif
1145
1146                         /*
1147                         Fortunately most interesting transport layer information fit
1148                         into first 8 bytes of IP data field (minimal nonzero size).
1149                         Thus we don't need actual packet reassembling to build whole
1150                         transport layer data. We only check the fragment offset for
1151                         zero value to find packet with this information.
1152                         */
1153                         if (!off_frag && psize >= 8) {
1154                                 switch (flow->proto) {
1155                                         case IPPROTO_TCP:
1156                                         case IPPROTO_UDP:
1157                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1158                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1159                                                 goto tl_known;
1160
1161 #ifdef ICMP_TRICK
1162                                         case IPPROTO_ICMP:
1163                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1164                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1165                                                 goto tl_known;
1166 #endif
1167 #ifdef ICMP_TRICK_CISCO
1168                                         case IPPROTO_ICMP:
1169                                                 flow->dp = *((int32_t *) tl);
1170                                                 goto tl_known;
1171 #endif
1172
1173                                         default:
1174                                                 /* Unknown transport layer */
1175 #if ((DEBUG) & DEBUG_C)
1176                                                 strcat(logbuf, " U");
1177 #endif
1178                                                 flow->sp = 0;
1179                                                 flow->dp = 0;
1180                                                 break;
1181
1182                                         tl_known:
1183 #if ((DEBUG) & DEBUG_C)
1184                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1185                                                 strcat(logbuf, buf);
1186 #endif
1187                                                 flow->flags |= FLOW_TL;
1188                                 }
1189                         }
1190
1191                         /* Check for tcp flags presence (including CWR and ECE). */
1192                         if (flow->proto == IPPROTO_TCP
1193                                 && off_frag < 16
1194                                 && psize >= 16 - off_frag) {
1195                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1196 #if ((DEBUG) & DEBUG_C)
1197                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1198                                 strcat(logbuf, buf);
1199 #endif
1200                         }
1201
1202 #if ((DEBUG) & DEBUG_C)
1203                         sprintf(buf, " => %x", (unsigned) flow);
1204                         strcat(logbuf, buf);
1205                         my_log(LOG_DEBUG, "%s", logbuf);
1206 #endif
1207
1208 #if ((DEBUG) & DEBUG_I)
1209                         pkts_pending++;
1210                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1211                         if (pending_queue_trace < pending_queue_trace_candidate)
1212                                 pending_queue_trace = pending_queue_trace_candidate;
1213 #endif
1214
1215                         /* Flow complete - inform unpending_thread() about it */
1216                         pending_head->flags |= FLOW_PENDING;
1217                         pending_head = pending_head->next;
1218                 done:
1219                         pthread_cond_signal(&unpending_cond);
1220                 }
1221         }
1222         return 0;
1223 }
1224
1225 int main(int argc, char **argv)
1226 {
1227         char errpbuf[512];
1228         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1229         int c, i, write_fd, memory_limit = 0;
1230         struct addrinfo hints, *res;
1231         struct sockaddr_in saddr;
1232         pthread_attr_t tattr;
1233         struct sigaction sigact;
1234         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1235         struct timeval timeout;
1236
1237         sched_min = sched_get_priority_min(SCHED);
1238         sched_max = sched_get_priority_max(SCHED);
1239
1240         memset(&saddr, 0 , sizeof(saddr));
1241         memset(&hints, 0 , sizeof(hints));
1242         hints.ai_flags = AI_PASSIVE;
1243         hints.ai_family = AF_INET;
1244         hints.ai_socktype = SOCK_DGRAM;
1245
1246         /* Process command line options */
1247
1248         opterr = 0;
1249         while ((c = my_getopt(argc, argv, parms)) != -1) {
1250                 switch (c) {
1251                         case '?':
1252                                 usage();
1253
1254                         case 'h':
1255                                 usage();
1256                 }
1257         }
1258
1259         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1260         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1261         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1262         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1263         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1264         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1265         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1266         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1267         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1268         if (parms[nflag].count) {
1269                 switch (atoi(parms[nflag].arg)) {
1270                         case 1:
1271                                 netflow = &NetFlow1;
1272                                 break;
1273
1274                         case 5:
1275                                 break;
1276
1277                         case 7:
1278                                 netflow = &NetFlow7;
1279                                 break;
1280
1281                         default:
1282                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1283                                 exit(1);
1284                 }
1285         }
1286         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1287         if (parms[lflag].count) {
1288                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1289                         *log_suffix++ = 0;
1290                         if (*log_suffix) {
1291                                 sprintf(errpbuf, "[%s]", log_suffix);
1292                                 strcat(ident, errpbuf);
1293                         }
1294                 }
1295                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1296                 if (log_suffix) *--log_suffix = ':';
1297         }
1298         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1299         err_malloc:
1300                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1301                 exit(1);
1302         }
1303         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1304         if (parms[qflag].count) {
1305                 pending_queue_length = atoi(parms[qflag].arg);
1306                 if (pending_queue_length < 1) {
1307                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1308                         exit(1);
1309                 }
1310         }
1311         if (parms[rflag].count) {
1312                 schedp.sched_priority = atoi(parms[rflag].arg);
1313                 if (schedp.sched_priority
1314                         && (schedp.sched_priority < sched_min
1315                                 || schedp.sched_priority > sched_max)) {
1316                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1317                         exit(1);
1318                 }
1319         }
1320         if (parms[Bflag].count) {
1321                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1322         }
1323         if (parms[bflag].count) {
1324                 bulk_quantity = atoi(parms[bflag].arg);
1325                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1326                         fprintf(stderr, "Illegal %s\n", "bulk size");
1327                         exit(1);
1328                 }
1329         }
1330         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1331         if (parms[Xflag].count) {
1332                 for(i = 0; parms[Xflag].arg[i]; i++)
1333                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1334                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1335                         goto err_malloc;
1336                 rule = strtok(parms[Xflag].arg, ":");
1337                 for (i = 0; rule; i++) {
1338                         snmp_rules[i].len = strlen(rule);
1339                         if (snmp_rules[i].len > IFNAMSIZ) {
1340                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1341                                 exit(1);
1342                         }
1343                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1344                         if (!*(rule - 1)) *(rule - 1) = ',';
1345                         rule = strtok(NULL, ",");
1346                         if (!rule) {
1347                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1348                                 exit(1);
1349                         }
1350                         snmp_rules[i].base = atoi(rule);
1351                         *(rule - 1) = ':';
1352                         rule = strtok(NULL, ":");
1353                 }
1354                 nsnmp_rules = i;
1355         }
1356         if (parms[tflag].count)
1357                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1358         if (parms[aflag].count) {
1359                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1360                 bad_lhost:
1361                         fprintf(stderr, "Illegal %s\n", "source address");
1362                         exit(1);
1363                 } else {
1364                         saddr = *((struct sockaddr_in *) res->ai_addr);
1365                         freeaddrinfo(res);
1366                 }
1367         }
1368         if (parms[uflag].count) 
1369                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1370                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1371                         exit(1);
1372                 }
1373
1374
1375         /* Process collectors parameters. Brrrr... :-[ */
1376
1377         npeers = argc - optind;
1378         if (npeers > 1) {
1379                 /* Send to remote Netflow collector */
1380                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1381                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1382                         dhost = argv[i];
1383                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1384                         *dport++ = 0;
1385                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1386                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1387                                 exit(1);
1388                         }
1389                         peers[npeers].write_fd = write_fd;
1390                         peers[npeers].type = PEER_MIRROR;
1391                         peers[npeers].laddr = saddr;
1392                         peers[npeers].seq = 0;
1393                         if ((lhost = strchr(dport, '/'))) {
1394                                 *lhost++ = 0;
1395                                 if ((type = strchr(lhost, '/'))) {
1396                                         *type++ = 0;
1397                                         switch (*type) {
1398                                                 case 0:
1399                                                 case 'm':
1400                                                         break;
1401
1402                                                 case 'r':
1403                                                         peers[npeers].type = PEER_ROTATE;
1404                                                         npeers_rot++;
1405                                                         break;
1406
1407                                                 default:
1408                                                         goto bad_collector;
1409                                         }
1410                                 }
1411                                 if (*lhost) {
1412                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1413                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1414                                         freeaddrinfo(res);
1415                                 }
1416                         }
1417                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1418                                                 sizeof(struct sockaddr_in))) {
1419                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1420                                 exit(1);
1421                         }
1422                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1423 bad_collector:
1424                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1425                                 exit(1);
1426                         }
1427                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1428                         freeaddrinfo(res);
1429                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1430                                                 sizeof(struct sockaddr_in))) {
1431                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1432                                 exit(1);
1433                         }
1434
1435                         /* Restore command line */
1436                         if (type) *--type = '/';
1437                         if (lhost) *--lhost = '/';
1438                         *--dport = ':';
1439                 }
1440         }
1441         else if (parms[fflag].count) {
1442                 // log into a file
1443                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1444                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1445                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1446                 
1447                 peers[npeers].write_fd = START_VALUE;
1448                 peers[npeers].type = PEER_FILE;
1449                 peers[npeers].seq = 0;
1450                 npeers++;
1451         }
1452         else 
1453                 usage();
1454
1455
1456         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1457         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1458         if (!ulog_handle) {
1459                 fprintf(stderr, "libipulog initialization error: %s",
1460                         ipulog_strerror(ipulog_errno));
1461                 exit(1);
1462         }
1463         if (sockbufsize)
1464                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1465                         &sockbufsize, sizeof(sockbufsize)) < 0)
1466                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1467
1468         /* Daemonize (if log destination stdout-free) */
1469
1470         my_log_open(ident, verbosity, log_dest);
1471         if (!(log_dest & 2)) {
1472                 /* Crash-proofing - Sapan*/
1473                 while (1) {
1474                         int pid=fork();
1475                         if (pid==-1) {
1476                                         fprintf(stderr, "fork(): %s", strerror(errno));
1477                                         exit(1);
1478                         }
1479                         else if (pid==0) {
1480                                         setsid();
1481                                         freopen("/dev/null", "r", stdin);
1482                                         freopen("/dev/null", "w", stdout);
1483                                         freopen("/dev/null", "w", stderr);
1484                                         break;
1485                         }
1486                         else {
1487                                 while (wait3(NULL,0,NULL) < 1);
1488                         }
1489                 }
1490         } else {
1491                 setvbuf(stdout, (char *)0, _IONBF, 0);
1492                 setvbuf(stderr, (char *)0, _IONBF, 0);
1493         }
1494
1495         pid = getpid();
1496         sprintf(errpbuf, "[%ld]", (long) pid);
1497         strcat(ident, errpbuf);
1498
1499         /* Initialization */
1500
1501         hash_init(); /* Actually for crc16 only */
1502         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1503         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1504
1505 #ifdef UPTIME_TRICK
1506         /* Hope 12 days is enough :-/ */
1507         start_time_offset = 1 << 20;
1508
1509         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1510 #endif
1511         gettime(&start_time);
1512
1513         /*
1514         Build static pending queue as circular buffer.
1515         */
1516         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1517         pending_tail = pending_head;
1518         for (i = pending_queue_length - 1; i--;) {
1519                 if (!(pending_tail->next = mem_alloc())) {
1520                 err_mem_alloc:
1521                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1522                         exit(1);
1523                 }
1524                 pending_tail = pending_tail->next;
1525         }
1526         pending_tail->next = pending_head;
1527         pending_tail = pending_head;
1528
1529         sigemptyset(&sig_mask);
1530         sigact.sa_handler = &sighandler;
1531         sigact.sa_mask = sig_mask;
1532         sigact.sa_flags = 0;
1533         sigaddset(&sig_mask, SIGTERM);
1534         sigaction(SIGTERM, &sigact, 0);
1535 #if ((DEBUG) & DEBUG_I)
1536         sigaddset(&sig_mask, SIGUSR1);
1537         sigaction(SIGUSR1, &sigact, 0);
1538 #endif
1539         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1540                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1541                 exit(1);
1542         }
1543
1544         my_log(LOG_INFO, "Starting %s...", VERSION);
1545
1546         if (parms[cflag].count) {
1547                 if (chdir(parms[cflag].arg) || chroot(".")) {
1548                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1549                         exit(1);
1550                 }
1551         }
1552
1553         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1554         pthread_attr_init(&tattr);
1555         for (i = 0; i < THREADS - 1; i++) {
1556                 if (schedp.sched_priority > 0) {
1557                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1558                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1559                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1560                                 exit(1);
1561                         }
1562                 }
1563                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1564                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1565                         exit(1);
1566                 }
1567                 pthread_detach(thid);
1568                 schedp.sched_priority++;
1569         }
1570
1571         if (pw) {
1572                 if (setgroups(0, NULL)) {
1573                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1574                         exit(1);
1575                 }
1576                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1577                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1578                         exit(1);
1579                 }
1580                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1581                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1582                         exit(1);
1583                 }
1584         }
1585
1586         if (!(pidfile = fopen(pidfilepath, "w")))
1587                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1588         else {
1589                 fprintf(pidfile, "%ld\n", (long) pid);
1590                 fclose(pidfile);
1591         }
1592
1593         my_log(LOG_INFO, "pid: %d", pid);
1594         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1595                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1596                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1597                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1598                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1599                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1600                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1601         for (i = 0; i < nsnmp_rules; i++) {
1602                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1603                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1604         }
1605         for (i = 0; i < npeers; i++) {
1606                 switch (peers[i].type) {
1607                         case PEER_MIRROR:
1608                                 c = 'm';
1609                                 break;
1610                         case PEER_ROTATE:
1611                                 c = 'r';
1612                                 break;
1613                 }
1614                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1615                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1616                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1617         }
1618
1619         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1620
1621         timeout.tv_usec = 0;
1622         while (!killed
1623                 || (total_elements - free_elements - pending_queue_length)
1624                 || emit_count
1625                 || pending_tail->flags) {
1626
1627                 if (!sigs) {
1628                         timeout.tv_sec = scan_interval;
1629                         select(0, 0, 0, 0, &timeout);
1630                 }
1631
1632                 if (sigs & SIGTERM_MASK && !killed) {
1633                         sigs &= ~SIGTERM_MASK;
1634                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1635                         scan_interval = 1;
1636                         frag_lifetime = -1;
1637                         active_lifetime = -1;
1638                         inactive_lifetime = -1;
1639                         emit_timeout = 1;
1640                         unpending_timeout = 1;
1641                         killed = 1;
1642                         pthread_cond_signal(&scan_cond);
1643                         pthread_cond_signal(&unpending_cond);
1644                 }
1645
1646 #if ((DEBUG) & DEBUG_I)
1647                 if (sigs & SIGUSR1_MASK) {
1648                         sigs &= ~SIGUSR1_MASK;
1649                         info_debug();
1650                 }
1651 #endif
1652         }
1653         remove(pidfilepath);
1654 #if ((DEBUG) & DEBUG_I)
1655         info_debug();
1656 #endif
1657         my_log(LOG_INFO, "Done.");
1658 #ifdef WALL
1659         return 0;
1660 #endif
1661 }