Graceful recovery.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9         7/11/2007       Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11                 Added data collection (-f) functionality, xid support in the header and log file
12                 rotation.
13 */
14
15 #include <common.h>
16
17 /* stdout, stderr, freopen() */
18 #include <stdio.h>
19
20 /* atoi(), exit() */
21 #include <stdlib.h>
22
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
24 #include <unistd.h>
25
26 /* strerror() */
27 #include <string.h>
28
29 /* sig*() */
30 #include <signal.h>
31
32 /* statfs() */
33
34 #include <sys/statfs.h>
35
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
38         int fd;
39         u_int8_t blocking;
40         struct sockaddr_nl local;
41         struct sockaddr_nl peer;
42         struct nlmsghdr* last_nlhdr;
43 };
44
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
55 #include <net/if.h>
56
57 #include <sys/param.h>
58 #include <pwd.h>
59 #ifdef OS_LINUX
60 #include <grp.h>
61 #endif
62
63 /* pthread_*() */
64 #include <pthread.h>
65
66 /* errno */
67 #include <errno.h>
68
69 /* getaddrinfo() */
70 #include <netdb.h>
71
72 /* nanosleep() */
73 #include <time.h>
74
75 /* gettimeofday() */
76 #include <sys/time.h>
77
78 /* scheduling */
79 #include <sched.h>
80
81 /* select() (POSIX)*/
82 #include <sys/select.h>
83
84 /* open() */
85 #include <sys/stat.h>
86 #include <fcntl.h>
87
88 #include <fprobe-ulog.h>
89 #include <my_log.h>
90 #include <my_getopt.h>
91 #include <netflow.h>
92 #include <hash.h>
93 #include <mem.h>
94
95 enum {
96         aflag,
97         Bflag,
98         bflag,
99         cflag,
100         dflag,
101         Dflag,
102         eflag,
103         Eflag,
104         fflag,
105         gflag,
106         hflag,
107         lflag,
108         mflag,
109         Mflag,
110         nflag,
111         qflag,
112         rflag,
113         sflag,
114         tflag,
115         Tflag,
116         Uflag,
117         uflag,
118         vflag,
119         Wflag,
120         Xflag,
121 };
122
123 static struct getopt_parms parms[] = {
124         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'h', 0, 0, 0},
134         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'M', 0, 0, 0},
137         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {0, 0, 0, 0}
148 };
149
150 extern char *optarg;
151 extern int optind, opterr, optopt;
152 extern int errno;
153
154 extern struct NetFlow NetFlow1;
155 extern struct NetFlow NetFlow5;
156 extern struct NetFlow NetFlow7;
157
158 #define START_VALUE -5
159 #define mark_is_tos parms[Mflag].count
160 static unsigned scan_interval = 5;
161 static int min_free = 0;
162 static int frag_lifetime = 30;
163 static int inactive_lifetime = 60;
164 static int active_lifetime = 300;
165 static int sockbufsize;
166 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
167 #if (MEM_BITS == 0) || (MEM_BITS == 16)
168 #define BULK_QUANTITY 10000
169 #else
170 #define BULK_QUANTITY 200
171 #endif
172
173 static unsigned epoch_length=60, log_epochs=1; 
174 static unsigned cur_epoch=0,prev_uptime=0;
175
176 static unsigned bulk_quantity = BULK_QUANTITY;
177 static unsigned pending_queue_length = 100;
178 static struct NetFlow *netflow = &NetFlow5;
179 static unsigned verbosity = 6;
180 static unsigned log_dest = MY_LOG_SYSLOG;
181 static struct Time start_time;
182 static long start_time_offset;
183 static int off_tl;
184 /* From mem.c */
185 extern unsigned total_elements;
186 extern unsigned free_elements;
187 extern unsigned total_memory;
188 #if ((DEBUG) & DEBUG_I)
189 static unsigned emit_pkts, emit_queue;
190 static uint64_t size_total;
191 static unsigned pkts_total, pkts_total_fragmented;
192 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
193 static unsigned pkts_pending, pkts_pending_done;
194 static unsigned pending_queue_trace, pending_queue_trace_candidate;
195 static unsigned flows_total, flows_fragmented;
196 #endif
197 static unsigned emit_count;
198 static uint32_t emit_sequence;
199 static unsigned emit_rate_bytes, emit_rate_delay;
200 static struct Time emit_time;
201 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
202 static pthread_t thid;
203 static sigset_t sig_mask;
204 static struct sched_param schedp;
205 static int sched_min, sched_max;
206 static int npeers, npeers_rot;
207 static struct peer *peers;
208 static int sigs;
209
210 static struct Flow *flows[1 << HASH_BITS];
211 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
212
213 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
215
216 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
217 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
218 static struct Flow *pending_head, *pending_tail;
219 static struct Flow *scan_frag_dreg;
220
221 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
222 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
223 static struct Flow *flows_emit;
224
225 static char ident[256] = "fprobe-ulog";
226 static FILE *pidfile;
227 static char *pidfilepath;
228 static pid_t pid;
229 static int killed;
230 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
231 static struct ipulog_handle *ulog_handle;
232 static uint32_t ulog_gmask = 1;
233 static char *cap_buf;
234 static int nsnmp_rules;
235 static struct snmp_rule *snmp_rules;
236 static struct passwd *pw = 0;
237
238 void usage()
239 {
240         fprintf(stdout,
241                 "fprobe-ulog: a NetFlow probe. Version %s\n"
242                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
243                 "\n"
244                 "-h\t\tDisplay this help\n"
245                 "-U <mask>\tULOG group bitwise mask [1]\n"
246                 "-s <seconds>\tHow often scan for expired flows [5]\n"
247                 "-g <seconds>\tFragmented flow lifetime [30]\n"
248                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
249                 "-f <filename>\tLog flow data in a file\n"
250                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
251                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
252                 "-a <address>\tUse <address> as source for NetFlow flow\n"
253                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
254                 "-M\t\tUse netfilter mark value as ToS flag\n"
255                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
256                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
257                 "-q <flows>\tPending queue length [100]\n"
258                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
259                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
260                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
261                 "-c <directory>\tDirectory to chroot to\n"
262                 "-u <user>\tUser to run as\n"
263                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
264                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
265                 "-y <remote:port>\tAddress of the NetFlow collector\n"
266                 "-f <writable file>\tFile to write data into\n"
267                 "-T <n>\tRotate log file every n epochs\n"
268                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
269                 "-E <[1..60]>\tSize of an epoch in minutes\n"
270                 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
271                 ,
272                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
273         exit(0);
274 }
275
276 #if ((DEBUG) & DEBUG_I)
277 void info_debug()
278 {
279         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
280                 pkts_total, pkts_total_fragmented, size_total,
281                 pkts_pending - pkts_pending_done, pending_queue_trace);
282         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
283                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
284         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
285                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
286         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
287                 total_elements, free_elements, total_memory);
288 }
289 #endif
290
291 void sighandler(int sig)
292 {
293         switch (sig) {
294                 case SIGTERM:
295                         sigs |= SIGTERM_MASK;
296                         break;
297 #if ((DEBUG) & DEBUG_I)
298                 case SIGUSR1:
299                         sigs |= SIGUSR1_MASK;
300                         break;
301 #endif
302         }
303 }
304
305 void gettime(struct Time *now)
306 {
307         struct timeval t;
308
309         gettimeofday(&t, 0);
310         now->sec = t.tv_sec;
311         now->usec = t.tv_usec;
312 }
313
314
315 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
316 {
317         return (t1->sec - t2->sec)/60;
318 }
319
320 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
321 {
322         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
323 }
324
325 /* Uptime in miliseconds */
326 uint32_t getuptime(struct Time *t)
327 {
328         /* Maximum uptime is about 49/2 days */
329         return cmpmtime(t, &start_time);
330 }
331
332 /* Uptime in minutes */
333 uint32_t getuptime_minutes(struct Time *t)
334 {
335         /* Maximum uptime is about 49/2 days */
336         return cmpMtime(t, &start_time);
337 }
338
339 hash_t hash_flow(struct Flow *flow)
340 {
341         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
342         else return hash(flow, sizeof(struct Flow_TL));
343 }
344
345 uint16_t snmp_index(char *name) {
346         uint32_t i;
347
348         if (!*name) return 0;
349
350         for (i = 0; (int) i < nsnmp_rules; i++) {
351                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
352                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
353         }
354
355         if ((i = if_nametoindex(name))) return i;
356
357         return -1;
358 }
359
360 inline void copy_flow(struct Flow *src, struct Flow *dst)
361 {
362         dst->iif = src->iif;
363         dst->oif = src->oif;
364         dst->sip = src->sip;
365         dst->dip = src->dip;
366         dst->tos = src->tos;
367         dst->proto = src->proto;
368         dst->tcp_flags = src->tcp_flags;
369         dst->id = src->id;
370         dst->sp = src->sp;
371         dst->dp = src->dp;
372         dst->pkts = src->pkts;
373         dst->size = src->size;
374         dst->sizeF = src->sizeF;
375         dst->sizeP = src->sizeP;
376         dst->ctime = src->ctime;
377         dst->mtime = src->mtime;
378         dst->flags = src->flags;
379 }
380
381 void get_cur_epoch() {
382         int fd, len;
383         fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
384         if (fd != -1) {
385                 char snum[7];
386                 read(fd, snum, 7);
387                 sscanf(snum,"%d",&cur_epoch);
388         }
389         return;
390 }
391
392
393 void update_cur_epoch_file(int n) {
394         int fd, len;
395         char snum[7];
396         len=snprintf(snum,6,"%d",n);
397         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
398         if (fd == -1) {
399                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
400                 return;
401         }
402         write(fd, snum, len);
403         close(fd);
404 }
405
406 unsigned get_log_fd(char *fname, unsigned cur_fd) {
407         struct Time now;
408         unsigned cur_uptime;
409         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
410          * doesn't solve the problem */
411
412         struct statfs statfs;
413         int ret_fd;
414         gettime(&now);
415         cur_uptime = getuptime_minutes(&now);
416
417         if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
418                 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
419         }
420         else {
421                 if (min_free && statfs.f_bfree < min_free) 
422                         switch(cur_epoch) {
423                                 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
424                                         my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
425                                         exit(1);
426                                 default:
427                                         my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
428                                         cur_epoch = -1;
429                         }
430         }
431
432         /* Epoch length in minutes */
433         if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
434                 char nextname[MAX_PATH_LEN];
435                 int write_fd;
436                 prev_uptime = cur_uptime;
437                 cur_epoch = (cur_epoch + 1) % log_epochs;
438                 close(cur_fd);
439                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
440                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
441                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
442                         exit(1);
443                 }
444                 update_cur_epoch_file(cur_epoch);
445                 ret_fd = write_fd;
446         }
447         else
448                 ret_fd = cur_fd;
449         return(ret_fd);
450 }
451
452 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
453 {
454         struct Flow **flowpp;
455
456 #ifdef WALL
457         flowpp = 0;
458 #endif
459
460         if (prev) flowpp = *prev;
461
462         while (where) {
463                 if (where->sip.s_addr == what->sip.s_addr
464                         && where->dip.s_addr == what->dip.s_addr
465                         && where->proto == what->proto) {
466                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
467                                 case 0:
468                                         /* Both unfragmented */
469                                         if ((what->sp == where->sp)
470                                                 && (what->dp == where->dp)) goto done;
471                                         break;
472                                 case 2:
473                                         /* Both fragmented */
474                                         if (where->id == what->id) goto done;
475                                         break;
476                         }
477                 }
478                 flowpp = &where->next;
479                 where = where->next;
480         }
481 done:
482         if (prev) *prev = flowpp;
483         return where;
484 }
485
486 int put_into(struct Flow *flow, int flag
487 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
488         , char *logbuf
489 #endif
490 )
491 {
492         int ret = 0;
493         hash_t h;
494         struct Flow *flown, **flowpp;
495 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
496         char buf[64];
497 #endif
498
499         h = hash_flow(flow);
500 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
501         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
502         strcat(logbuf, buf);
503 #endif
504         pthread_mutex_lock(&flows_mutex[h]);
505         flowpp = &flows[h];
506         if (!(flown = find(flows[h], flow, &flowpp))) {
507                 /* No suitable flow found - add */
508                 if (flag == COPY_INTO) {
509                         if ((flown = mem_alloc())) {
510                                 copy_flow(flow, flown);
511                                 flow = flown;
512                         } else {
513 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
514                                 my_log(LOG_ERR, "%s %s. %s",
515                                         "mem_alloc():", strerror(errno), "packet lost");
516 #endif
517                                 return -1;
518                         }
519                 }
520                 flow->next = flows[h];
521                 flows[h] = flow;
522 #if ((DEBUG) & DEBUG_I)
523                 flows_total++;
524                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
525 #endif
526 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
527                 if (flown) {
528                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
529                         strcat(logbuf, buf);
530                 }
531 #endif
532         } else {
533                 /* Found suitable flow - update */
534 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
535                 sprintf(buf, " +> %x", (unsigned) flown);
536                 strcat(logbuf, buf);
537 #endif
538                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
539                         flown->mtime = flow->mtime;
540                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
541                         flown->ctime = flow->ctime;
542                 flown->tcp_flags |= flow->tcp_flags;
543                 flown->size += flow->size;
544                 flown->pkts += flow->pkts;
545                 if (flow->flags & FLOW_FRAG) {
546                         /* Fragmented flow require some additional work */
547                         if (flow->flags & FLOW_TL) {
548                                 /*
549                                 ?FIXME?
550                                 Several packets with FLOW_TL (attack)
551                                 */
552                                 flown->sp = flow->sp;
553                                 flown->dp = flow->dp;
554                         }
555                         if (flow->flags & FLOW_LASTFRAG) {
556                                 /*
557                                 ?FIXME?
558                                 Several packets with FLOW_LASTFRAG (attack)
559                                 */
560                                 flown->sizeP = flow->sizeP;
561                         }
562                         flown->flags |= flow->flags;
563                         flown->sizeF += flow->sizeF;
564                         if ((flown->flags & FLOW_LASTFRAG)
565                                 && (flown->sizeF >= flown->sizeP)) {
566                                 /* All fragments received - flow reassembled */
567                                 *flowpp = flown->next;
568                                 pthread_mutex_unlock(&flows_mutex[h]);
569 #if ((DEBUG) & DEBUG_I)
570                                 flows_total--;
571                                 flows_fragmented--;
572 #endif
573                                 flown->id = 0;
574                                 flown->flags &= ~FLOW_FRAG;
575 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
576                                 strcat(logbuf," R");
577 #endif
578                                 ret = put_into(flown, MOVE_INTO
579 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
580                                                 , logbuf
581 #endif
582                                         );
583                         }
584                 }
585                 if (flag == MOVE_INTO) mem_free(flow);
586         }
587         pthread_mutex_unlock(&flows_mutex[h]);
588         return ret;
589 }
590
591 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
592 {
593         int i;
594
595         for (i = 0; i < fields; i++) {
596 #if ((DEBUG) & DEBUG_F)
597                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
598 #endif
599                 switch (format[i]) {
600                         case NETFLOW_IPV4_SRC_ADDR:
601                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
602                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
603                                 break;
604
605                         case NETFLOW_IPV4_DST_ADDR:
606                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
607                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
608                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
609                                 }
610                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
611                                 break;
612
613                         case NETFLOW_INPUT_SNMP:
614                                 *((uint16_t *) p) = htons(flow->iif);
615                                 p += NETFLOW_INPUT_SNMP_SIZE;
616                                 break;
617
618                         case NETFLOW_OUTPUT_SNMP:
619                                 *((uint16_t *) p) = htons(flow->oif);
620                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
621                                 break;
622
623                         case NETFLOW_PKTS_32:
624                                 *((uint32_t *) p) = htonl(flow->pkts);
625                                 p += NETFLOW_PKTS_32_SIZE;
626                                 break;
627
628                         case NETFLOW_BYTES_32:
629                                 *((uint32_t *) p) = htonl(flow->size);
630                                 p += NETFLOW_BYTES_32_SIZE;
631                                 break;
632
633                         case NETFLOW_FIRST_SWITCHED:
634                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
635                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
636                                 break;
637
638                         case NETFLOW_LAST_SWITCHED:
639                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
640                                 p += NETFLOW_LAST_SWITCHED_SIZE;
641                                 break;
642
643                         case NETFLOW_L4_SRC_PORT:
644                                 *((uint16_t *) p) = flow->sp;
645                                 p += NETFLOW_L4_SRC_PORT_SIZE;
646                                 break;
647
648                         case NETFLOW_L4_DST_PORT:
649                                 *((uint16_t *) p) = flow->dp;
650                                 p += NETFLOW_L4_DST_PORT_SIZE;
651                                 break;
652
653                         case NETFLOW_PROT:
654                                 *((uint8_t *) p) = flow->proto;
655                                 p += NETFLOW_PROT_SIZE;
656                                 break;
657
658                         case NETFLOW_SRC_TOS:
659                                 *((uint8_t *) p) = flow->tos;
660                                 p += NETFLOW_SRC_TOS_SIZE;
661                                 break;
662
663                         case NETFLOW_TCP_FLAGS:
664                                 *((uint8_t *) p) = flow->tcp_flags;
665                                 p += NETFLOW_TCP_FLAGS_SIZE;
666                                 break;
667
668                         case NETFLOW_VERSION:
669                                 *((uint16_t *) p) = htons(netflow->Version);
670                                 p += NETFLOW_VERSION_SIZE;
671                                 break;
672
673                         case NETFLOW_COUNT:
674                                 *((uint16_t *) p) = htons(emit_count);
675                                 p += NETFLOW_COUNT_SIZE;
676                                 break;
677
678                         case NETFLOW_UPTIME:
679                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
680                                 p += NETFLOW_UPTIME_SIZE;
681                                 break;
682
683                         case NETFLOW_UNIX_SECS:
684                                 *((uint32_t *) p) = htonl(emit_time.sec);
685                                 p += NETFLOW_UNIX_SECS_SIZE;
686                                 break;
687
688                         case NETFLOW_UNIX_NSECS:
689                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
690                                 p += NETFLOW_UNIX_NSECS_SIZE;
691                                 break;
692
693                         case NETFLOW_FLOW_SEQUENCE:
694                                 //*((uint32_t *) p) = htonl(emit_sequence);
695                                 *((uint32_t *) p) = 0;
696                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
697                                 break;
698
699                         case NETFLOW_PAD8:
700                         /* Unsupported (uint8_t) */
701                         case NETFLOW_ENGINE_TYPE:
702                         case NETFLOW_ENGINE_ID:
703                         case NETFLOW_FLAGS7_1:
704                         case NETFLOW_SRC_MASK:
705                         case NETFLOW_DST_MASK:
706                                 *((uint8_t *) p) = 0;
707                                 p += NETFLOW_PAD8_SIZE;
708                                 break;
709                         case NETFLOW_XID:
710                                 *((uint16_t *) p) = flow->tos;
711                                 p += NETFLOW_XID_SIZE;
712                                 break;
713                         case NETFLOW_PAD16:
714                         /* Unsupported (uint16_t) */
715                         case NETFLOW_SRC_AS:
716                         case NETFLOW_DST_AS:
717                         case NETFLOW_FLAGS7_2:
718                                 *((uint16_t *) p) = 0;
719                                 p += NETFLOW_PAD16_SIZE;
720                                 break;
721
722                         case NETFLOW_PAD32:
723                         /* Unsupported (uint32_t) */
724                         case NETFLOW_IPV4_NEXT_HOP:
725                         case NETFLOW_ROUTER_SC:
726                                 *((uint32_t *) p) = 0;
727                                 p += NETFLOW_PAD32_SIZE;
728                                 break;
729
730                         default:
731                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
732                                         format, i, format[i]);
733                                 exit(1);
734                 }
735         }
736 #if ((DEBUG) & DEBUG_F)
737         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
738 #endif
739         return p;
740 }
741
742 void setuser() {
743         /*
744         Workaround for clone()-based threads
745         Try to change EUID independently of main thread
746         */
747         if (pw) {
748                 setgroups(0, NULL);
749                 setregid(pw->pw_gid, pw->pw_gid);
750                 setreuid(pw->pw_uid, pw->pw_uid);
751         }
752 }
753
754 void *emit_thread()
755 {
756         struct Flow *flow;
757         void *p;
758         struct timeval now;
759         struct timespec timeout;
760         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
761
762         p = (void *) &emit_packet + netflow->HeaderSize;
763         timeout.tv_nsec = 0;
764
765         setuser();
766
767         for (;;) {
768                 pthread_mutex_lock(&emit_mutex);
769                 while (!flows_emit) {
770                         gettimeofday(&now, 0);
771                         timeout.tv_sec = now.tv_sec + emit_timeout;
772                         /* Do not wait until emit_packet will filled - it may be too long */
773                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
774                                 pthread_mutex_unlock(&emit_mutex);
775                                 goto sendit;
776                         }
777                 }
778                 flow = flows_emit;
779                 flows_emit = flows_emit->next;
780 #if ((DEBUG) & DEBUG_I)
781                 emit_queue--;
782 #endif          
783                 pthread_mutex_unlock(&emit_mutex);
784
785 #ifdef UPTIME_TRICK
786                 if (!emit_count) {
787                         gettime(&start_time);
788                         start_time.sec -= start_time_offset;
789                 }
790 #endif
791                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
792                 mem_free(flow);
793                 emit_count++;
794 #ifdef PF2_DEBUG
795                 printf("Emit count = %d\n", emit_count);
796                 fflush(stdout);
797 #endif
798                 if (emit_count == netflow->MaxFlows) {
799                 sendit:
800                         gettime(&emit_time);
801                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
802                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
803                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
804                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
805                         peer_rot_cur = 0;
806                         for (i = 0; i < npeers; i++) {
807                                 if (peers[i].type == PEER_FILE) {
808                                                 if (netflow->SeqOffset)
809                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
810                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
811                                                 ret = write(peers[i].write_fd, emit_packet, size);
812                                                 if (ret < size) {
813
814 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
815                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
816                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
817 #endif
818 #undef MESSAGES
819                                                 }
820 #if ((DEBUG) & DEBUG_E)
821                                                 commaneelse {
822                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
823                                                                 emit_count, i + 1, peers[i].seq);
824                                                 }
825 #endif
826                                                 peers[i].seq += emit_count;
827
828                                                 /* Rate limit */
829                                                 if (emit_rate_bytes) {
830                                                         sent += size;
831                                                         delay = sent / emit_rate_bytes;
832                                                         if (delay) {
833                                                                 sent %= emit_rate_bytes;
834                                                                 timeout.tv_sec = 0;
835                                                                 timeout.tv_nsec = emit_rate_delay * delay;
836                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
837                                                         }
838                                                 }
839                                         }
840                                 else
841                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
842                                 else
843                                 if (peers[i].type == PEER_ROTATE) 
844                                         if (peer_rot_cur++ == peer_rot_work) {
845                                         sendreal:
846                                                 if (netflow->SeqOffset)
847                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
848                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
849                                                 if (ret < size) {
850 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
851                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
852                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
853 #endif
854                                                 }
855 #if ((DEBUG) & DEBUG_E)
856                                                 commaneelse {
857                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
858                                                                 emit_count, i + 1, peers[i].seq);
859                                                 }
860 #endif
861                                                 peers[i].seq += emit_count;
862
863                                                 /* Rate limit */
864                                                 if (emit_rate_bytes) {
865                                                         sent += size;
866                                                         delay = sent / emit_rate_bytes;
867                                                         if (delay) {
868                                                                 sent %= emit_rate_bytes;
869                                                                 timeout.tv_sec = 0;
870                                                                 timeout.tv_nsec = emit_rate_delay * delay;
871                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
872                                                         }
873                                                 }
874                                         }
875                         }
876                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
877                         emit_sequence += emit_count;
878                         emit_count = 0;
879 #if ((DEBUG) & DEBUG_I)
880                         emit_pkts++;
881 #endif
882                 }
883         }
884 }       
885
886 void *unpending_thread()
887 {
888         struct timeval now;
889         struct timespec timeout;
890 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
891         char logbuf[256];
892 #endif
893
894         setuser();
895
896         timeout.tv_nsec = 0;
897         pthread_mutex_lock(&unpending_mutex);
898
899         for (;;) {
900                 while (!(pending_tail->flags & FLOW_PENDING)) {
901                         gettimeofday(&now, 0);
902                         timeout.tv_sec = now.tv_sec + unpending_timeout;
903                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
904                 }
905
906 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
907                 *logbuf = 0;
908 #endif
909                 if (put_into(pending_tail, COPY_INTO
910 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
911                                 , logbuf
912 #endif
913                         ) < 0) {
914 #if ((DEBUG) & DEBUG_I)
915                         pkts_lost_unpending++;
916 #endif                          
917                 }
918
919 #if ((DEBUG) & DEBUG_U)
920                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
921 #endif
922
923                 pending_tail->flags = 0;
924                 pending_tail = pending_tail->next;
925 #if ((DEBUG) & DEBUG_I)
926                 pkts_pending_done++;
927 #endif
928         }
929 }
930
931 void *scan_thread()
932 {
933 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
934         char logbuf[256];
935 #endif
936         int i;
937         struct Flow *flow, **flowpp;
938         struct Time now;
939         struct timespec timeout;
940
941         setuser();
942
943         timeout.tv_nsec = 0;
944         pthread_mutex_lock(&scan_mutex);
945
946         for (;;) {
947                 gettime(&now);
948                 timeout.tv_sec = now.sec + scan_interval;
949                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
950
951                 gettime(&now);
952 #if ((DEBUG) & DEBUG_S)
953                 my_log(LOG_DEBUG, "S: %d", now.sec);
954 #endif
955                 for (i = 0; i < 1 << HASH_BITS ; i++) {
956                         pthread_mutex_lock(&flows_mutex[i]);
957                         flow = flows[i];
958                         flowpp = &flows[i];
959                         while (flow) {
960                                 if (flow->flags & FLOW_FRAG) {
961                                         /* Process fragmented flow */
962                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
963                                                 /* Fragmented flow expired - put it into special chain */
964 #if ((DEBUG) & DEBUG_I)
965                                                 flows_fragmented--;
966                                                 flows_total--;
967 #endif
968                                                 *flowpp = flow->next;
969                                                 flow->id = 0;
970                                                 flow->flags &= ~FLOW_FRAG;
971                                                 flow->next = scan_frag_dreg;
972                                                 scan_frag_dreg = flow;
973                                                 flow = *flowpp;
974                                                 continue;
975                                         }
976                                 } else {
977                                         /* Flow is not frgamented */
978                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
979                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
980                                                 /* Flow expired */
981 #if ((DEBUG) & DEBUG_S)
982                                                 my_log(LOG_DEBUG, "S: E %x", flow);
983 #endif
984 #if ((DEBUG) & DEBUG_I)
985                                                 flows_total--;
986 #endif
987                                                 *flowpp = flow->next;
988                                                 pthread_mutex_lock(&emit_mutex);
989                                                 flow->next = flows_emit;
990                                                 flows_emit = flow;
991 #if ((DEBUG) & DEBUG_I)
992                                                 emit_queue++;
993 #endif                          
994                                                 pthread_mutex_unlock(&emit_mutex);
995                                                 flow = *flowpp;
996                                                 continue;
997                                         }
998                                 }
999                                 flowpp = &flow->next;
1000                                 flow = flow->next;
1001                         } /* chain loop */
1002                         pthread_mutex_unlock(&flows_mutex[i]);
1003                 } /* hash loop */
1004                 if (flows_emit) pthread_cond_signal(&emit_cond);
1005
1006                 while (scan_frag_dreg) {
1007                         flow = scan_frag_dreg;
1008                         scan_frag_dreg = flow->next;
1009 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1010                         *logbuf = 0;
1011 #endif
1012                         put_into(flow, MOVE_INTO
1013 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1014                                 , logbuf
1015 #endif
1016                         );
1017 #if ((DEBUG) & DEBUG_S)
1018                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1019 #endif
1020                 }
1021         }
1022 }
1023
1024 void *cap_thread()
1025 {
1026         struct ulog_packet_msg *ulog_msg;
1027         struct ip *nl;
1028         void *tl;
1029         struct Flow *flow;
1030         int len, off_frag, psize;
1031 #if ((DEBUG) & DEBUG_C)
1032         char buf[64];
1033         char logbuf[256];
1034 #endif
1035
1036         setuser();
1037
1038         while (!killed) {
1039                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1040                 if (len <= 0) {
1041                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1042                         continue;
1043                 }
1044                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1045
1046 #if ((DEBUG) & DEBUG_C)
1047                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1048 #endif
1049
1050                         nl = (void *) &ulog_msg->payload;
1051                         psize = ulog_msg->data_len;
1052
1053                         /* Sanity check */
1054                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1055 #if ((DEBUG) & DEBUG_C)
1056                                 strcat(logbuf, " U");
1057                                 my_log(LOG_DEBUG, "%s", logbuf);
1058 #endif
1059 #if ((DEBUG) & DEBUG_I)
1060                                 pkts_ignored++;
1061 #endif
1062                                 continue;
1063                         }
1064
1065                         if (pending_head->flags) {
1066 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1067                                 my_log(LOG_ERR,
1068 # if ((DEBUG) & DEBUG_C)
1069                                         "%s %s %s", logbuf,
1070 # else
1071                                         "%s %s",
1072 # endif
1073                                         "pending queue full:", "packet lost");
1074 #endif
1075 #if ((DEBUG) & DEBUG_I)
1076                                 pkts_lost_capture++;
1077 #endif
1078                                 goto done;
1079                         }
1080
1081 #if ((DEBUG) & DEBUG_I)
1082                         pkts_total++;
1083 #endif
1084
1085                         flow = pending_head;
1086
1087                         /* ?FIXME? Add sanity check for ip_len? */
1088                         flow->size = ntohs(nl->ip_len);
1089 #if ((DEBUG) & DEBUG_I)
1090                         size_total += flow->size;
1091 #endif
1092
1093                         flow->sip = nl->ip_src;
1094                         flow->dip = nl->ip_dst;
1095                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1096                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1097                         }
1098                         flow->iif = snmp_index(ulog_msg->indev_name);
1099                         flow->oif = snmp_index(ulog_msg->outdev_name);
1100                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1101                         flow->proto = nl->ip_p;
1102                         flow->id = 0;
1103                         flow->tcp_flags = 0;
1104                         flow->pkts = 1;
1105                         flow->sizeF = 0;
1106                         flow->sizeP = 0;
1107                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1108                         if (ulog_msg->timestamp_sec) {
1109                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1110                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1111                         } else gettime(&flow->ctime);
1112                         flow->mtime = flow->ctime;
1113
1114                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1115
1116                         /*
1117                         Offset (from network layer) to transport layer header/IP data
1118                         IOW IP header size ;-)
1119
1120                         ?FIXME?
1121                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1122                         */
1123                         off_tl = nl->ip_hl << 2;
1124                         tl = (void *) nl + off_tl;
1125
1126                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1127                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1128                         psize -= off_tl;
1129                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1130                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1131
1132                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1133                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1134 #if ((DEBUG) & DEBUG_C)
1135                                 strcat(logbuf, " F");
1136 #endif
1137 #if ((DEBUG) & DEBUG_I)
1138                                 pkts_total_fragmented++;
1139 #endif
1140                                 flow->flags |= FLOW_FRAG;
1141                                 flow->id = nl->ip_id;
1142
1143                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1144                                         /* Packet whith IP_MF contains information about whole datagram size */
1145                                         flow->flags |= FLOW_LASTFRAG;
1146                                         /* size = frag_offset*8 + data_size */
1147                                         flow->sizeP = off_frag + flow->sizeF;
1148                                 }
1149                         }
1150
1151 #if ((DEBUG) & DEBUG_C)
1152                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1153                         strcat(logbuf, buf);
1154                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1155                         strcat(logbuf, buf);
1156 #endif
1157
1158                         /*
1159                         Fortunately most interesting transport layer information fit
1160                         into first 8 bytes of IP data field (minimal nonzero size).
1161                         Thus we don't need actual packet reassembling to build whole
1162                         transport layer data. We only check the fragment offset for
1163                         zero value to find packet with this information.
1164                         */
1165                         if (!off_frag && psize >= 8) {
1166                                 switch (flow->proto) {
1167                                         case IPPROTO_TCP:
1168                                         case IPPROTO_UDP:
1169                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1170                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1171                                                 goto tl_known;
1172
1173 #ifdef ICMP_TRICK
1174                                         case IPPROTO_ICMP:
1175                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1176                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1177                                                 goto tl_known;
1178 #endif
1179 #ifdef ICMP_TRICK_CISCO
1180                                         case IPPROTO_ICMP:
1181                                                 flow->dp = *((int32_t *) tl);
1182                                                 goto tl_known;
1183 #endif
1184
1185                                         default:
1186                                                 /* Unknown transport layer */
1187 #if ((DEBUG) & DEBUG_C)
1188                                                 strcat(logbuf, " U");
1189 #endif
1190                                                 flow->sp = 0;
1191                                                 flow->dp = 0;
1192                                                 break;
1193
1194                                         tl_known:
1195 #if ((DEBUG) & DEBUG_C)
1196                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1197                                                 strcat(logbuf, buf);
1198 #endif
1199                                                 flow->flags |= FLOW_TL;
1200                                 }
1201                         }
1202
1203                         /* Check for tcp flags presence (including CWR and ECE). */
1204                         if (flow->proto == IPPROTO_TCP
1205                                 && off_frag < 16
1206                                 && psize >= 16 - off_frag) {
1207                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1208 #if ((DEBUG) & DEBUG_C)
1209                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1210                                 strcat(logbuf, buf);
1211 #endif
1212                         }
1213
1214 #if ((DEBUG) & DEBUG_C)
1215                         sprintf(buf, " => %x", (unsigned) flow);
1216                         strcat(logbuf, buf);
1217                         my_log(LOG_DEBUG, "%s", logbuf);
1218 #endif
1219
1220 #if ((DEBUG) & DEBUG_I)
1221                         pkts_pending++;
1222                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1223                         if (pending_queue_trace < pending_queue_trace_candidate)
1224                                 pending_queue_trace = pending_queue_trace_candidate;
1225 #endif
1226
1227                         /* Flow complete - inform unpending_thread() about it */
1228                         pending_head->flags |= FLOW_PENDING;
1229                         pending_head = pending_head->next;
1230                 done:
1231                         pthread_cond_signal(&unpending_cond);
1232                 }
1233         }
1234         return 0;
1235 }
1236
1237 int main(int argc, char **argv)
1238 {
1239         char errpbuf[512];
1240         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1241         int c, i, write_fd, memory_limit = 0;
1242         struct addrinfo hints, *res;
1243         struct sockaddr_in saddr;
1244         pthread_attr_t tattr;
1245         struct sigaction sigact;
1246         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1247         struct timeval timeout;
1248
1249         sched_min = sched_get_priority_min(SCHED);
1250         sched_max = sched_get_priority_max(SCHED);
1251
1252         memset(&saddr, 0 , sizeof(saddr));
1253         memset(&hints, 0 , sizeof(hints));
1254         hints.ai_flags = AI_PASSIVE;
1255         hints.ai_family = AF_INET;
1256         hints.ai_socktype = SOCK_DGRAM;
1257
1258         /* Process command line options */
1259
1260         opterr = 0;
1261         while ((c = my_getopt(argc, argv, parms)) != -1) {
1262                 switch (c) {
1263                         case '?':
1264                                 usage();
1265
1266                         case 'h':
1267                                 usage();
1268                 }
1269         }
1270
1271         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1272         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1273         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1274         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1275         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1276         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1277         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1278         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1279         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1280         if (parms[nflag].count) {
1281                 switch (atoi(parms[nflag].arg)) {
1282                         case 1:
1283                                 netflow = &NetFlow1;
1284                                 break;
1285
1286                         case 5:
1287                                 break;
1288
1289                         case 7:
1290                                 netflow = &NetFlow7;
1291                                 break;
1292
1293                         default:
1294                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1295                                 exit(1);
1296                 }
1297         }
1298         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1299         if (parms[lflag].count) {
1300                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1301                         *log_suffix++ = 0;
1302                         if (*log_suffix) {
1303                                 sprintf(errpbuf, "[%s]", log_suffix);
1304                                 strcat(ident, errpbuf);
1305                         }
1306                 }
1307                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1308                 if (log_suffix) *--log_suffix = ':';
1309         }
1310         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1311         err_malloc:
1312                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1313                 exit(1);
1314         }
1315         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1316         if (parms[qflag].count) {
1317                 pending_queue_length = atoi(parms[qflag].arg);
1318                 if (pending_queue_length < 1) {
1319                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1320                         exit(1);
1321                 }
1322         }
1323         if (parms[rflag].count) {
1324                 schedp.sched_priority = atoi(parms[rflag].arg);
1325                 if (schedp.sched_priority
1326                         && (schedp.sched_priority < sched_min
1327                                 || schedp.sched_priority > sched_max)) {
1328                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1329                         exit(1);
1330                 }
1331         }
1332         if (parms[Bflag].count) {
1333                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1334         }
1335         if (parms[bflag].count) {
1336                 bulk_quantity = atoi(parms[bflag].arg);
1337                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1338                         fprintf(stderr, "Illegal %s\n", "bulk size");
1339                         exit(1);
1340                 }
1341         }
1342         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1343         if (parms[Xflag].count) {
1344                 for(i = 0; parms[Xflag].arg[i]; i++)
1345                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1346                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1347                         goto err_malloc;
1348                 rule = strtok(parms[Xflag].arg, ":");
1349                 for (i = 0; rule; i++) {
1350                         snmp_rules[i].len = strlen(rule);
1351                         if (snmp_rules[i].len > IFNAMSIZ) {
1352                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1353                                 exit(1);
1354                         }
1355                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1356                         if (!*(rule - 1)) *(rule - 1) = ',';
1357                         rule = strtok(NULL, ",");
1358                         if (!rule) {
1359                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1360                                 exit(1);
1361                         }
1362                         snmp_rules[i].base = atoi(rule);
1363                         *(rule - 1) = ':';
1364                         rule = strtok(NULL, ":");
1365                 }
1366                 nsnmp_rules = i;
1367         }
1368         if (parms[tflag].count)
1369                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1370         if (parms[aflag].count) {
1371                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1372                 bad_lhost:
1373                         fprintf(stderr, "Illegal %s\n", "source address");
1374                         exit(1);
1375                 } else {
1376                         saddr = *((struct sockaddr_in *) res->ai_addr);
1377                         freeaddrinfo(res);
1378                 }
1379         }
1380         if (parms[uflag].count) 
1381                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1382                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1383                         exit(1);
1384                 }
1385
1386
1387         /* Process collectors parameters. Brrrr... :-[ */
1388
1389         npeers = argc - optind;
1390         if (npeers > 1) {
1391                 /* Send to remote Netflow collector */
1392                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1393                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1394                         dhost = argv[i];
1395                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1396                         *dport++ = 0;
1397                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1398                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1399                                 exit(1);
1400                         }
1401                         peers[npeers].write_fd = write_fd;
1402                         peers[npeers].type = PEER_MIRROR;
1403                         peers[npeers].laddr = saddr;
1404                         peers[npeers].seq = 0;
1405                         if ((lhost = strchr(dport, '/'))) {
1406                                 *lhost++ = 0;
1407                                 if ((type = strchr(lhost, '/'))) {
1408                                         *type++ = 0;
1409                                         switch (*type) {
1410                                                 case 0:
1411                                                 case 'm':
1412                                                         break;
1413
1414                                                 case 'r':
1415                                                         peers[npeers].type = PEER_ROTATE;
1416                                                         npeers_rot++;
1417                                                         break;
1418
1419                                                 default:
1420                                                         goto bad_collector;
1421                                         }
1422                                 }
1423                                 if (*lhost) {
1424                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1425                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1426                                         freeaddrinfo(res);
1427                                 }
1428                         }
1429                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1430                                                 sizeof(struct sockaddr_in))) {
1431                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1432                                 exit(1);
1433                         }
1434                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1435 bad_collector:
1436                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1437                                 exit(1);
1438                         }
1439                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1440                         freeaddrinfo(res);
1441                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1442                                                 sizeof(struct sockaddr_in))) {
1443                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1444                                 exit(1);
1445                         }
1446
1447                         /* Restore command line */
1448                         if (type) *--type = '/';
1449                         if (lhost) *--lhost = '/';
1450                         *--dport = ':';
1451                 }
1452         }
1453         else if (parms[fflag].count) {
1454                 // log into a file
1455                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1456                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1457                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1458                 
1459                 peers[npeers].write_fd = START_VALUE;
1460                 peers[npeers].type = PEER_FILE;
1461                 peers[npeers].seq = 0;
1462
1463                 get_cur_epoch();
1464                 npeers++;
1465         }
1466         else 
1467                 usage();
1468
1469
1470         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1471         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1472         if (!ulog_handle) {
1473                 fprintf(stderr, "libipulog initialization error: %s",
1474                         ipulog_strerror(ipulog_errno));
1475                 exit(1);
1476         }
1477         if (sockbufsize)
1478                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1479                         &sockbufsize, sizeof(sockbufsize)) < 0)
1480                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1481
1482         /* Daemonize (if log destination stdout-free) */
1483
1484         my_log_open(ident, verbosity, log_dest);
1485         if (!(log_dest & 2)) {
1486                 /* Crash-proofing - Sapan*/
1487                 while (1) {
1488                         int pid=fork();
1489                         if (pid==-1) {
1490                                         fprintf(stderr, "fork(): %s", strerror(errno));
1491                                         exit(1);
1492                         }
1493                         else if (pid==0) {
1494                                         setsid();
1495                                         freopen("/dev/null", "r", stdin);
1496                                         freopen("/dev/null", "w", stdout);
1497                                         freopen("/dev/null", "w", stderr);
1498                                         break;
1499                         }
1500                         else {
1501                                 while (wait3(NULL,0,NULL) < 1);
1502                         }
1503                 }
1504         } else {
1505                 setvbuf(stdout, (char *)0, _IONBF, 0);
1506                 setvbuf(stderr, (char *)0, _IONBF, 0);
1507         }
1508
1509         pid = getpid();
1510         sprintf(errpbuf, "[%ld]", (long) pid);
1511         strcat(ident, errpbuf);
1512
1513         /* Initialization */
1514
1515         hash_init(); /* Actually for crc16 only */
1516         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1517         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1518
1519 #ifdef UPTIME_TRICK
1520         /* Hope 12 days is enough :-/ */
1521         start_time_offset = 1 << 20;
1522
1523         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1524 #endif
1525         gettime(&start_time);
1526
1527         /*
1528         Build static pending queue as circular buffer.
1529         */
1530         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1531         pending_tail = pending_head;
1532         for (i = pending_queue_length - 1; i--;) {
1533                 if (!(pending_tail->next = mem_alloc())) {
1534                 err_mem_alloc:
1535                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1536                         exit(1);
1537                 }
1538                 pending_tail = pending_tail->next;
1539         }
1540         pending_tail->next = pending_head;
1541         pending_tail = pending_head;
1542
1543         sigemptyset(&sig_mask);
1544         sigact.sa_handler = &sighandler;
1545         sigact.sa_mask = sig_mask;
1546         sigact.sa_flags = 0;
1547         sigaddset(&sig_mask, SIGTERM);
1548         sigaction(SIGTERM, &sigact, 0);
1549 #if ((DEBUG) & DEBUG_I)
1550         sigaddset(&sig_mask, SIGUSR1);
1551         sigaction(SIGUSR1, &sigact, 0);
1552 #endif
1553         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1554                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1555                 exit(1);
1556         }
1557
1558         my_log(LOG_INFO, "Starting %s...", VERSION);
1559
1560         if (parms[cflag].count) {
1561                 if (chdir(parms[cflag].arg) || chroot(".")) {
1562                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1563                         exit(1);
1564                 }
1565         }
1566
1567         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1568         pthread_attr_init(&tattr);
1569         for (i = 0; i < THREADS - 1; i++) {
1570                 if (schedp.sched_priority > 0) {
1571                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1572                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1573                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1574                                 exit(1);
1575                         }
1576                 }
1577                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1578                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1579                         exit(1);
1580                 }
1581                 pthread_detach(thid);
1582                 schedp.sched_priority++;
1583         }
1584
1585         if (pw) {
1586                 if (setgroups(0, NULL)) {
1587                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1588                         exit(1);
1589                 }
1590                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1591                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1592                         exit(1);
1593                 }
1594                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1595                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1596                         exit(1);
1597                 }
1598         }
1599
1600         if (!(pidfile = fopen(pidfilepath, "w")))
1601                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1602         else {
1603                 fprintf(pidfile, "%ld\n", (long) pid);
1604                 fclose(pidfile);
1605         }
1606
1607         my_log(LOG_INFO, "pid: %d", pid);
1608         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1609                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1610                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1611                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1612                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1613                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1614                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1615         for (i = 0; i < nsnmp_rules; i++) {
1616                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1617                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1618         }
1619         for (i = 0; i < npeers; i++) {
1620                 switch (peers[i].type) {
1621                         case PEER_MIRROR:
1622                                 c = 'm';
1623                                 break;
1624                         case PEER_ROTATE:
1625                                 c = 'r';
1626                                 break;
1627                 }
1628                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1629                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1630                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1631         }
1632
1633         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1634
1635         timeout.tv_usec = 0;
1636         while (!killed
1637                 || (total_elements - free_elements - pending_queue_length)
1638                 || emit_count
1639                 || pending_tail->flags) {
1640
1641                 if (!sigs) {
1642                         timeout.tv_sec = scan_interval;
1643                         select(0, 0, 0, 0, &timeout);
1644                 }
1645
1646                 if (sigs & SIGTERM_MASK && !killed) {
1647                         sigs &= ~SIGTERM_MASK;
1648                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1649                         scan_interval = 1;
1650                         frag_lifetime = -1;
1651                         active_lifetime = -1;
1652                         inactive_lifetime = -1;
1653                         emit_timeout = 1;
1654                         unpending_timeout = 1;
1655                         killed = 1;
1656                         pthread_cond_signal(&scan_cond);
1657                         pthread_cond_signal(&unpending_cond);
1658                 }
1659
1660 #if ((DEBUG) & DEBUG_I)
1661                 if (sigs & SIGUSR1_MASK) {
1662                         sigs &= ~SIGUSR1_MASK;
1663                         info_debug();
1664                 }
1665 #endif
1666         }
1667         remove(pidfilepath);
1668 #if ((DEBUG) & DEBUG_I)
1669         info_debug();
1670 #endif
1671         my_log(LOG_INFO, "Done.");
1672 #ifdef WALL
1673         return 0;
1674 #endif
1675 }