Frustrating bug
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9         7/11/2007       Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11                 Added data collection (-f) functionality, xid support in the header and log file
12                 rotation.
13 */
14
15 #include <common.h>
16
17 /* stdout, stderr, freopen() */
18 #include <stdio.h>
19
20 /* atoi(), exit() */
21 #include <stdlib.h>
22
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
24 #include <unistd.h>
25
26 /* strerror() */
27 #include <string.h>
28
29 /* sig*() */
30 #include <signal.h>
31
32 /* statfs() */
33
34 #include <sys/statfs.h>
35
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
38         int fd;
39         u_int8_t blocking;
40         struct sockaddr_nl local;
41         struct sockaddr_nl peer;
42         struct nlmsghdr* last_nlhdr;
43 };
44
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
55 #include <net/if.h>
56
57 #include <sys/param.h>
58 #include <pwd.h>
59 #ifdef OS_LINUX
60 #include <grp.h>
61 #endif
62
63 /* pthread_*() */
64 #include <pthread.h>
65
66 /* errno */
67 #include <errno.h>
68
69 /* getaddrinfo() */
70 #include <netdb.h>
71
72 /* nanosleep() */
73 #include <time.h>
74
75 /* gettimeofday() */
76 #include <sys/time.h>
77
78 /* scheduling */
79 #include <sched.h>
80
81 /* select() (POSIX)*/
82 #include <sys/select.h>
83
84 /* open() */
85 #include <sys/stat.h>
86 #include <fcntl.h>
87
88 #include <fprobe-ulog.h>
89 #include <my_log.h>
90 #include <my_getopt.h>
91 #include <netflow.h>
92 #include <hash.h>
93 #include <mem.h>
94
95 enum {
96         aflag,
97         Bflag,
98         bflag,
99         cflag,
100         dflag,
101         Dflag,
102         eflag,
103         Eflag,
104         fflag,
105         gflag,
106         hflag,
107         lflag,
108         mflag,
109         Mflag,
110         nflag,
111         qflag,
112         rflag,
113         sflag,
114         tflag,
115         Tflag,
116         Uflag,
117         uflag,
118         vflag,
119         Wflag,
120         Xflag,
121 };
122
123 static struct getopt_parms parms[] = {
124         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'h', 0, 0, 0},
135         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'M', 0, 0, 0},
138         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
148         {0, 0, 0, 0}
149 };
150
151 extern char *optarg;
152 extern int optind, opterr, optopt;
153 extern int errno;
154
155 extern struct NetFlow NetFlow1;
156 extern struct NetFlow NetFlow5;
157 extern struct NetFlow NetFlow7;
158
159 #define START_VALUE -5
160 #define mark_is_tos parms[Mflag].count
161 static unsigned scan_interval = 5;
162 static int min_free = 0;
163 static int frag_lifetime = 30;
164 static int inactive_lifetime = 60;
165 static int active_lifetime = 300;
166 static int sockbufsize;
167 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
168 #if (MEM_BITS == 0) || (MEM_BITS == 16)
169 #define BULK_QUANTITY 10000
170 #else
171 #define BULK_QUANTITY 200
172 #endif
173
174 static unsigned epoch_length=60, log_epochs=1; 
175 static unsigned cur_epoch=0,prev_uptime=0;
176
177 static unsigned bulk_quantity = BULK_QUANTITY;
178 static unsigned pending_queue_length = 100;
179 static struct NetFlow *netflow = &NetFlow5;
180 static unsigned verbosity = 6;
181 static unsigned log_dest = MY_LOG_SYSLOG;
182 static struct Time start_time;
183 static long start_time_offset;
184 static int off_tl;
185 /* From mem.c */
186 extern unsigned total_elements;
187 extern unsigned free_elements;
188 extern unsigned total_memory;
189 #if ((DEBUG) & DEBUG_I)
190 static unsigned emit_pkts, emit_queue;
191 static uint64_t size_total;
192 static unsigned pkts_total, pkts_total_fragmented;
193 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
194 static unsigned pkts_pending, pkts_pending_done;
195 static unsigned pending_queue_trace, pending_queue_trace_candidate;
196 static unsigned flows_total, flows_fragmented;
197 #endif
198 static unsigned emit_count;
199 static uint32_t emit_sequence;
200 static unsigned emit_rate_bytes, emit_rate_delay;
201 static struct Time emit_time;
202 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
203 static pthread_t thid;
204 static sigset_t sig_mask;
205 static struct sched_param schedp;
206 static int sched_min, sched_max;
207 static int npeers, npeers_rot;
208 static struct peer *peers;
209 static int sigs;
210
211 static struct Flow *flows[1 << HASH_BITS];
212 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
213
214 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
215 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
216
217 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
218 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
219 static struct Flow *pending_head, *pending_tail;
220 static struct Flow *scan_frag_dreg;
221
222 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
224 static struct Flow *flows_emit;
225
226 static char ident[256] = "fprobe-ulog";
227 static FILE *pidfile;
228 static char *pidfilepath;
229 static pid_t pid;
230 static int killed;
231 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
232 static struct ipulog_handle *ulog_handle;
233 static uint32_t ulog_gmask = 1;
234 static char *cap_buf;
235 static int nsnmp_rules;
236 static struct snmp_rule *snmp_rules;
237 static struct passwd *pw = 0;
238
239 void usage()
240 {
241         fprintf(stdout,
242                 "fprobe-ulog: a NetFlow probe. Version %s\n"
243                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
244                 "\n"
245                 "-h\t\tDisplay this help\n"
246                 "-U <mask>\tULOG group bitwise mask [1]\n"
247                 "-s <seconds>\tHow often scan for expired flows [5]\n"
248                 "-g <seconds>\tFragmented flow lifetime [30]\n"
249                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
250                 "-f <filename>\tLog flow data in a file\n"
251                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
252                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
253                 "-a <address>\tUse <address> as source for NetFlow flow\n"
254                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
255                 "-M\t\tUse netfilter mark value as ToS flag\n"
256                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
257                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
258                 "-q <flows>\tPending queue length [100]\n"
259                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
260                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
261                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
262                 "-c <directory>\tDirectory to chroot to\n"
263                 "-u <user>\tUser to run as\n"
264                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
265                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
266                 "-y <remote:port>\tAddress of the NetFlow collector\n"
267                 "-f <writable file>\tFile to write data into\n"
268                 "-T <n>\tRotate log file every n epochs\n"
269                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
270                 "-E <[1..60]>\tSize of an epoch in minutes\n"
271                 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
272                 ,
273                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
274         exit(0);
275 }
276
277 #if ((DEBUG) & DEBUG_I)
278 void info_debug()
279 {
280         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
281                 pkts_total, pkts_total_fragmented, size_total,
282                 pkts_pending - pkts_pending_done, pending_queue_trace);
283         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
284                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
285         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
286                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
287         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
288                 total_elements, free_elements, total_memory);
289 }
290 #endif
291
292 void sighandler(int sig)
293 {
294         switch (sig) {
295                 case SIGTERM:
296                         sigs |= SIGTERM_MASK;
297                         break;
298 #if ((DEBUG) & DEBUG_I)
299                 case SIGUSR1:
300                         sigs |= SIGUSR1_MASK;
301                         break;
302 #endif
303         }
304 }
305
306 void gettime(struct Time *now)
307 {
308         struct timeval t;
309
310         gettimeofday(&t, 0);
311         now->sec = t.tv_sec;
312         now->usec = t.tv_usec;
313 }
314
315
316 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
317 {
318         return (t1->sec - t2->sec)/60;
319 }
320
321 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
322 {
323         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
324 }
325
326 /* Uptime in miliseconds */
327 uint32_t getuptime(struct Time *t)
328 {
329         /* Maximum uptime is about 49/2 days */
330         return cmpmtime(t, &start_time);
331 }
332
333 /* Uptime in minutes */
334 uint32_t getuptime_minutes(struct Time *t)
335 {
336         /* Maximum uptime is about 49/2 days */
337         return cmpMtime(t, &start_time);
338 }
339
340 hash_t hash_flow(struct Flow *flow)
341 {
342         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
343         else return hash(flow, sizeof(struct Flow_TL));
344 }
345
346 uint16_t snmp_index(char *name) {
347         uint32_t i;
348
349         if (!*name) return 0;
350
351         for (i = 0; (int) i < nsnmp_rules; i++) {
352                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
353                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
354         }
355
356         if ((i = if_nametoindex(name))) return i;
357
358         return -1;
359 }
360
361 inline void copy_flow(struct Flow *src, struct Flow *dst)
362 {
363         dst->iif = src->iif;
364         dst->oif = src->oif;
365         dst->sip = src->sip;
366         dst->dip = src->dip;
367         dst->tos = src->tos;
368         dst->proto = src->proto;
369         dst->tcp_flags = src->tcp_flags;
370         dst->id = src->id;
371         dst->sp = src->sp;
372         dst->dp = src->dp;
373         dst->pkts = src->pkts;
374         dst->size = src->size;
375         dst->sizeF = src->sizeF;
376         dst->sizeP = src->sizeP;
377         dst->ctime = src->ctime;
378         dst->mtime = src->mtime;
379         dst->flags = src->flags;
380 }
381
382 void get_cur_epoch() {
383         int fd;
384         fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
385         if (fd != -1) {
386                 char snum[7];
387                 ssize_t len;
388                 len = read(fd, snum, sizeof(snum)-1);
389                 if (len != -1) {
390                         snum[len]='\0';
391                         sscanf(snum,"%d",&cur_epoch);
392                         cur_epoch++; /* Let's not stone the last epoch */
393                         close(fd);
394                 }
395         }
396         return;
397 }
398
399
400 void update_cur_epoch_file(int n) {
401         int fd, len;
402         char snum[7];
403         len=snprintf(snum,6,"%d",n);
404         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
405         if (fd == -1) {
406                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
407                 return;
408         }
409         write(fd, snum, len);
410         close(fd);
411 }
412
413 unsigned get_log_fd(char *fname, int cur_fd) {
414         struct Time now;
415         unsigned cur_uptime;
416         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
417          * doesn't solve the problem */
418
419         struct statfs statfs;
420         int ret_fd;
421         gettime(&now);
422         cur_uptime = getuptime_minutes(&now);
423
424
425         if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
426                 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
427         }
428         else {
429                 if (min_free && (statfs.f_bfree < min_free)) 
430                         switch(cur_epoch) {
431                                 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
432                                         my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
433                                         exit(1);
434                                 default:
435                                         my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
436                                         cur_epoch = -1;
437                         }
438         }
439
440         /* Epoch length in minutes */
441         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
442                 char nextname[MAX_PATH_LEN];
443                 int write_fd;
444                 prev_uptime = cur_uptime;
445                 cur_epoch = (cur_epoch + 1) % log_epochs;
446                 if (cur_fd>0)
447                         close(cur_fd);
448                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
449                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
450                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
451                         exit(1);
452                 }
453                 update_cur_epoch_file(cur_epoch);
454                 ret_fd = write_fd;
455         }
456         else {
457                 ret_fd = cur_fd;
458         }
459         return(ret_fd);
460 }
461
462 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
463 {
464         struct Flow **flowpp;
465
466 #ifdef WALL
467         flowpp = 0;
468 #endif
469
470         if (prev) flowpp = *prev;
471
472         while (where) {
473                 if (where->sip.s_addr == what->sip.s_addr
474                         && where->dip.s_addr == what->dip.s_addr
475                         && where->proto == what->proto) {
476                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
477                                 case 0:
478                                         /* Both unfragmented */
479                                         if ((what->sp == where->sp)
480                                                 && (what->dp == where->dp)) goto done;
481                                         break;
482                                 case 2:
483                                         /* Both fragmented */
484                                         if (where->id == what->id) goto done;
485                                         break;
486                         }
487                 }
488                 flowpp = &where->next;
489                 where = where->next;
490         }
491 done:
492         if (prev) *prev = flowpp;
493         return where;
494 }
495
496 int put_into(struct Flow *flow, int flag
497 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
498         , char *logbuf
499 #endif
500 )
501 {
502         int ret = 0;
503         hash_t h;
504         struct Flow *flown, **flowpp;
505 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
506         char buf[64];
507 #endif
508
509         h = hash_flow(flow);
510 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
511         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
512         strcat(logbuf, buf);
513 #endif
514         pthread_mutex_lock(&flows_mutex[h]);
515         flowpp = &flows[h];
516         if (!(flown = find(flows[h], flow, &flowpp))) {
517                 /* No suitable flow found - add */
518                 if (flag == COPY_INTO) {
519                         if ((flown = mem_alloc())) {
520                                 copy_flow(flow, flown);
521                                 flow = flown;
522                         } else {
523 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
524                                 my_log(LOG_ERR, "%s %s. %s",
525                                         "mem_alloc():", strerror(errno), "packet lost");
526 #endif
527                                 return -1;
528                         }
529                 }
530                 flow->next = flows[h];
531                 flows[h] = flow;
532 #if ((DEBUG) & DEBUG_I)
533                 flows_total++;
534                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
535 #endif
536 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
537                 if (flown) {
538                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
539                         strcat(logbuf, buf);
540                 }
541 #endif
542         } else {
543                 /* Found suitable flow - update */
544 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
545                 sprintf(buf, " +> %x", (unsigned) flown);
546                 strcat(logbuf, buf);
547 #endif
548                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
549                         flown->mtime = flow->mtime;
550                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
551                         flown->ctime = flow->ctime;
552                 flown->tcp_flags |= flow->tcp_flags;
553                 flown->size += flow->size;
554                 flown->pkts += flow->pkts;
555                 if (flow->flags & FLOW_FRAG) {
556                         /* Fragmented flow require some additional work */
557                         if (flow->flags & FLOW_TL) {
558                                 /*
559                                 ?FIXME?
560                                 Several packets with FLOW_TL (attack)
561                                 */
562                                 flown->sp = flow->sp;
563                                 flown->dp = flow->dp;
564                         }
565                         if (flow->flags & FLOW_LASTFRAG) {
566                                 /*
567                                 ?FIXME?
568                                 Several packets with FLOW_LASTFRAG (attack)
569                                 */
570                                 flown->sizeP = flow->sizeP;
571                         }
572                         flown->flags |= flow->flags;
573                         flown->sizeF += flow->sizeF;
574                         if ((flown->flags & FLOW_LASTFRAG)
575                                 && (flown->sizeF >= flown->sizeP)) {
576                                 /* All fragments received - flow reassembled */
577                                 *flowpp = flown->next;
578                                 pthread_mutex_unlock(&flows_mutex[h]);
579 #if ((DEBUG) & DEBUG_I)
580                                 flows_total--;
581                                 flows_fragmented--;
582 #endif
583                                 flown->id = 0;
584                                 flown->flags &= ~FLOW_FRAG;
585 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
586                                 strcat(logbuf," R");
587 #endif
588                                 ret = put_into(flown, MOVE_INTO
589 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
590                                                 , logbuf
591 #endif
592                                         );
593                         }
594                 }
595                 if (flag == MOVE_INTO) mem_free(flow);
596         }
597         pthread_mutex_unlock(&flows_mutex[h]);
598         return ret;
599 }
600
601 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
602 {
603         int i;
604
605         for (i = 0; i < fields; i++) {
606 #if ((DEBUG) & DEBUG_F)
607                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
608 #endif
609                 switch (format[i]) {
610                         case NETFLOW_IPV4_SRC_ADDR:
611                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
612                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
613                                 break;
614
615                         case NETFLOW_IPV4_DST_ADDR:
616                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
617                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
618                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
619                                 }
620                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
621                                 break;
622
623                         case NETFLOW_INPUT_SNMP:
624                                 *((uint16_t *) p) = htons(flow->iif);
625                                 p += NETFLOW_INPUT_SNMP_SIZE;
626                                 break;
627
628                         case NETFLOW_OUTPUT_SNMP:
629                                 *((uint16_t *) p) = htons(flow->oif);
630                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
631                                 break;
632
633                         case NETFLOW_PKTS_32:
634                                 *((uint32_t *) p) = htonl(flow->pkts);
635                                 p += NETFLOW_PKTS_32_SIZE;
636                                 break;
637
638                         case NETFLOW_BYTES_32:
639                                 *((uint32_t *) p) = htonl(flow->size);
640                                 p += NETFLOW_BYTES_32_SIZE;
641                                 break;
642
643                         case NETFLOW_FIRST_SWITCHED:
644                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
645                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
646                                 break;
647
648                         case NETFLOW_LAST_SWITCHED:
649                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
650                                 p += NETFLOW_LAST_SWITCHED_SIZE;
651                                 break;
652
653                         case NETFLOW_L4_SRC_PORT:
654                                 *((uint16_t *) p) = flow->sp;
655                                 p += NETFLOW_L4_SRC_PORT_SIZE;
656                                 break;
657
658                         case NETFLOW_L4_DST_PORT:
659                                 *((uint16_t *) p) = flow->dp;
660                                 p += NETFLOW_L4_DST_PORT_SIZE;
661                                 break;
662
663                         case NETFLOW_PROT:
664                                 *((uint8_t *) p) = flow->proto;
665                                 p += NETFLOW_PROT_SIZE;
666                                 break;
667
668                         case NETFLOW_SRC_TOS:
669                                 *((uint8_t *) p) = flow->tos;
670                                 p += NETFLOW_SRC_TOS_SIZE;
671                                 break;
672
673                         case NETFLOW_TCP_FLAGS:
674                                 *((uint8_t *) p) = flow->tcp_flags;
675                                 p += NETFLOW_TCP_FLAGS_SIZE;
676                                 break;
677
678                         case NETFLOW_VERSION:
679                                 *((uint16_t *) p) = htons(netflow->Version);
680                                 p += NETFLOW_VERSION_SIZE;
681                                 break;
682
683                         case NETFLOW_COUNT:
684                                 *((uint16_t *) p) = htons(emit_count);
685                                 p += NETFLOW_COUNT_SIZE;
686                                 break;
687
688                         case NETFLOW_UPTIME:
689                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
690                                 p += NETFLOW_UPTIME_SIZE;
691                                 break;
692
693                         case NETFLOW_UNIX_SECS:
694                                 *((uint32_t *) p) = htonl(emit_time.sec);
695                                 p += NETFLOW_UNIX_SECS_SIZE;
696                                 break;
697
698                         case NETFLOW_UNIX_NSECS:
699                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
700                                 p += NETFLOW_UNIX_NSECS_SIZE;
701                                 break;
702
703                         case NETFLOW_FLOW_SEQUENCE:
704                                 //*((uint32_t *) p) = htonl(emit_sequence);
705                                 *((uint32_t *) p) = 0;
706                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
707                                 break;
708
709                         case NETFLOW_PAD8:
710                         /* Unsupported (uint8_t) */
711                         case NETFLOW_ENGINE_TYPE:
712                         case NETFLOW_ENGINE_ID:
713                         case NETFLOW_FLAGS7_1:
714                         case NETFLOW_SRC_MASK:
715                         case NETFLOW_DST_MASK:
716                                 *((uint8_t *) p) = 0;
717                                 p += NETFLOW_PAD8_SIZE;
718                                 break;
719                         case NETFLOW_XID:
720                                 *((uint16_t *) p) = flow->tos;
721                                 p += NETFLOW_XID_SIZE;
722                                 break;
723                         case NETFLOW_PAD16:
724                         /* Unsupported (uint16_t) */
725                         case NETFLOW_SRC_AS:
726                         case NETFLOW_DST_AS:
727                         case NETFLOW_FLAGS7_2:
728                                 *((uint16_t *) p) = 0;
729                                 p += NETFLOW_PAD16_SIZE;
730                                 break;
731
732                         case NETFLOW_PAD32:
733                         /* Unsupported (uint32_t) */
734                         case NETFLOW_IPV4_NEXT_HOP:
735                         case NETFLOW_ROUTER_SC:
736                                 *((uint32_t *) p) = 0;
737                                 p += NETFLOW_PAD32_SIZE;
738                                 break;
739
740                         default:
741                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
742                                         format, i, format[i]);
743                                 exit(1);
744                 }
745         }
746 #if ((DEBUG) & DEBUG_F)
747         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
748 #endif
749         return p;
750 }
751
752 void setuser() {
753         /*
754         Workaround for clone()-based threads
755         Try to change EUID independently of main thread
756         */
757         if (pw) {
758                 setgroups(0, NULL);
759                 setregid(pw->pw_gid, pw->pw_gid);
760                 setreuid(pw->pw_uid, pw->pw_uid);
761         }
762 }
763
764 void *emit_thread()
765 {
766         struct Flow *flow;
767         void *p;
768         struct timeval now;
769         struct timespec timeout;
770         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
771
772         p = (void *) &emit_packet + netflow->HeaderSize;
773         timeout.tv_nsec = 0;
774
775         setuser();
776
777         for (;;) {
778                 pthread_mutex_lock(&emit_mutex);
779                 while (!flows_emit) {
780                         gettimeofday(&now, 0);
781                         timeout.tv_sec = now.tv_sec + emit_timeout;
782                         /* Do not wait until emit_packet will filled - it may be too long */
783                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
784                                 pthread_mutex_unlock(&emit_mutex);
785                                 goto sendit;
786                         }
787                 }
788                 flow = flows_emit;
789                 flows_emit = flows_emit->next;
790 #if ((DEBUG) & DEBUG_I)
791                 emit_queue--;
792 #endif          
793                 pthread_mutex_unlock(&emit_mutex);
794
795 #ifdef UPTIME_TRICK
796                 if (!emit_count) {
797                         gettime(&start_time);
798                         start_time.sec -= start_time_offset;
799                 }
800 #endif
801                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
802                 mem_free(flow);
803                 emit_count++;
804 #ifdef PF2_DEBUG
805                 printf("Emit count = %d\n", emit_count);
806                 fflush(stdout);
807 #endif
808                 if (emit_count == netflow->MaxFlows) {
809                 sendit:
810                         gettime(&emit_time);
811                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
812                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
813                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
814                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
815                         peer_rot_cur = 0;
816                         for (i = 0; i < npeers; i++) {
817                                 if (peers[i].type == PEER_FILE) {
818                                                 if (netflow->SeqOffset)
819                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
820                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
821                                                 ret = write(peers[i].write_fd, emit_packet, size);
822                                                 if (ret < size) {
823
824 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
825                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
826                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
827 #endif
828 #undef MESSAGES
829                                                 }
830 #if ((DEBUG) & DEBUG_E)
831                                                 commaneelse {
832                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
833                                                                 emit_count, i + 1, peers[i].seq);
834                                                 }
835 #endif
836                                                 peers[i].seq += emit_count;
837
838                                                 /* Rate limit */
839                                                 if (emit_rate_bytes) {
840                                                         sent += size;
841                                                         delay = sent / emit_rate_bytes;
842                                                         if (delay) {
843                                                                 sent %= emit_rate_bytes;
844                                                                 timeout.tv_sec = 0;
845                                                                 timeout.tv_nsec = emit_rate_delay * delay;
846                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
847                                                         }
848                                                 }
849                                         }
850                                 else
851                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
852                                 else
853                                 if (peers[i].type == PEER_ROTATE) 
854                                         if (peer_rot_cur++ == peer_rot_work) {
855                                         sendreal:
856                                                 if (netflow->SeqOffset)
857                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
858                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
859                                                 if (ret < size) {
860 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
861                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
862                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
863 #endif
864                                                 }
865 #if ((DEBUG) & DEBUG_E)
866                                                 commaneelse {
867                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
868                                                                 emit_count, i + 1, peers[i].seq);
869                                                 }
870 #endif
871                                                 peers[i].seq += emit_count;
872
873                                                 /* Rate limit */
874                                                 if (emit_rate_bytes) {
875                                                         sent += size;
876                                                         delay = sent / emit_rate_bytes;
877                                                         if (delay) {
878                                                                 sent %= emit_rate_bytes;
879                                                                 timeout.tv_sec = 0;
880                                                                 timeout.tv_nsec = emit_rate_delay * delay;
881                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
882                                                         }
883                                                 }
884                                         }
885                         }
886                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
887                         emit_sequence += emit_count;
888                         emit_count = 0;
889 #if ((DEBUG) & DEBUG_I)
890                         emit_pkts++;
891 #endif
892                 }
893         }
894 }       
895
896 void *unpending_thread()
897 {
898         struct timeval now;
899         struct timespec timeout;
900 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
901         char logbuf[256];
902 #endif
903
904         setuser();
905
906         timeout.tv_nsec = 0;
907         pthread_mutex_lock(&unpending_mutex);
908
909         for (;;) {
910                 while (!(pending_tail->flags & FLOW_PENDING)) {
911                         gettimeofday(&now, 0);
912                         timeout.tv_sec = now.tv_sec + unpending_timeout;
913                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
914                 }
915
916 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
917                 *logbuf = 0;
918 #endif
919                 if (put_into(pending_tail, COPY_INTO
920 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
921                                 , logbuf
922 #endif
923                         ) < 0) {
924 #if ((DEBUG) & DEBUG_I)
925                         pkts_lost_unpending++;
926 #endif                          
927                 }
928
929 #if ((DEBUG) & DEBUG_U)
930                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
931 #endif
932
933                 pending_tail->flags = 0;
934                 pending_tail = pending_tail->next;
935 #if ((DEBUG) & DEBUG_I)
936                 pkts_pending_done++;
937 #endif
938         }
939 }
940
941 void *scan_thread()
942 {
943 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
944         char logbuf[256];
945 #endif
946         int i;
947         struct Flow *flow, **flowpp;
948         struct Time now;
949         struct timespec timeout;
950
951         setuser();
952
953         timeout.tv_nsec = 0;
954         pthread_mutex_lock(&scan_mutex);
955
956         for (;;) {
957                 gettime(&now);
958                 timeout.tv_sec = now.sec + scan_interval;
959                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
960
961                 gettime(&now);
962 #if ((DEBUG) & DEBUG_S)
963                 my_log(LOG_DEBUG, "S: %d", now.sec);
964 #endif
965                 for (i = 0; i < 1 << HASH_BITS ; i++) {
966                         pthread_mutex_lock(&flows_mutex[i]);
967                         flow = flows[i];
968                         flowpp = &flows[i];
969                         while (flow) {
970                                 if (flow->flags & FLOW_FRAG) {
971                                         /* Process fragmented flow */
972                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
973                                                 /* Fragmented flow expired - put it into special chain */
974 #if ((DEBUG) & DEBUG_I)
975                                                 flows_fragmented--;
976                                                 flows_total--;
977 #endif
978                                                 *flowpp = flow->next;
979                                                 flow->id = 0;
980                                                 flow->flags &= ~FLOW_FRAG;
981                                                 flow->next = scan_frag_dreg;
982                                                 scan_frag_dreg = flow;
983                                                 flow = *flowpp;
984                                                 continue;
985                                         }
986                                 } else {
987                                         /* Flow is not frgamented */
988                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
989                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
990                                                 /* Flow expired */
991 #if ((DEBUG) & DEBUG_S)
992                                                 my_log(LOG_DEBUG, "S: E %x", flow);
993 #endif
994 #if ((DEBUG) & DEBUG_I)
995                                                 flows_total--;
996 #endif
997                                                 *flowpp = flow->next;
998                                                 pthread_mutex_lock(&emit_mutex);
999                                                 flow->next = flows_emit;
1000                                                 flows_emit = flow;
1001 #if ((DEBUG) & DEBUG_I)
1002                                                 emit_queue++;
1003 #endif                          
1004                                                 pthread_mutex_unlock(&emit_mutex);
1005                                                 flow = *flowpp;
1006                                                 continue;
1007                                         }
1008                                 }
1009                                 flowpp = &flow->next;
1010                                 flow = flow->next;
1011                         } /* chain loop */
1012                         pthread_mutex_unlock(&flows_mutex[i]);
1013                 } /* hash loop */
1014                 if (flows_emit) pthread_cond_signal(&emit_cond);
1015
1016                 while (scan_frag_dreg) {
1017                         flow = scan_frag_dreg;
1018                         scan_frag_dreg = flow->next;
1019 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1020                         *logbuf = 0;
1021 #endif
1022                         put_into(flow, MOVE_INTO
1023 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1024                                 , logbuf
1025 #endif
1026                         );
1027 #if ((DEBUG) & DEBUG_S)
1028                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1029 #endif
1030                 }
1031         }
1032 }
1033
1034 void *cap_thread()
1035 {
1036         struct ulog_packet_msg *ulog_msg;
1037         struct ip *nl;
1038         void *tl;
1039         struct Flow *flow;
1040         int len, off_frag, psize;
1041 #if ((DEBUG) & DEBUG_C)
1042         char buf[64];
1043         char logbuf[256];
1044 #endif
1045
1046         setuser();
1047
1048         while (!killed) {
1049                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1050                 if (len <= 0) {
1051                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1052                         continue;
1053                 }
1054                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1055
1056 #if ((DEBUG) & DEBUG_C)
1057                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1058 #endif
1059
1060                         nl = (void *) &ulog_msg->payload;
1061                         psize = ulog_msg->data_len;
1062
1063                         /* Sanity check */
1064                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1065 #if ((DEBUG) & DEBUG_C)
1066                                 strcat(logbuf, " U");
1067                                 my_log(LOG_DEBUG, "%s", logbuf);
1068 #endif
1069 #if ((DEBUG) & DEBUG_I)
1070                                 pkts_ignored++;
1071 #endif
1072                                 continue;
1073                         }
1074
1075                         if (pending_head->flags) {
1076 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1077                                 my_log(LOG_ERR,
1078 # if ((DEBUG) & DEBUG_C)
1079                                         "%s %s %s", logbuf,
1080 # else
1081                                         "%s %s",
1082 # endif
1083                                         "pending queue full:", "packet lost");
1084 #endif
1085 #if ((DEBUG) & DEBUG_I)
1086                                 pkts_lost_capture++;
1087 #endif
1088                                 goto done;
1089                         }
1090
1091 #if ((DEBUG) & DEBUG_I)
1092                         pkts_total++;
1093 #endif
1094
1095                         flow = pending_head;
1096
1097                         /* ?FIXME? Add sanity check for ip_len? */
1098                         flow->size = ntohs(nl->ip_len);
1099 #if ((DEBUG) & DEBUG_I)
1100                         size_total += flow->size;
1101 #endif
1102
1103                         flow->sip = nl->ip_src;
1104                         flow->dip = nl->ip_dst;
1105                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1106                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1107                         }
1108                         flow->iif = snmp_index(ulog_msg->indev_name);
1109                         flow->oif = snmp_index(ulog_msg->outdev_name);
1110                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1111                         flow->proto = nl->ip_p;
1112                         flow->id = 0;
1113                         flow->tcp_flags = 0;
1114                         flow->pkts = 1;
1115                         flow->sizeF = 0;
1116                         flow->sizeP = 0;
1117                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1118                         if (ulog_msg->timestamp_sec) {
1119                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1120                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1121                         } else gettime(&flow->ctime);
1122                         flow->mtime = flow->ctime;
1123
1124                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1125
1126                         /*
1127                         Offset (from network layer) to transport layer header/IP data
1128                         IOW IP header size ;-)
1129
1130                         ?FIXME?
1131                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1132                         */
1133                         off_tl = nl->ip_hl << 2;
1134                         tl = (void *) nl + off_tl;
1135
1136                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1137                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1138                         psize -= off_tl;
1139                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1140                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1141
1142                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1143                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1144 #if ((DEBUG) & DEBUG_C)
1145                                 strcat(logbuf, " F");
1146 #endif
1147 #if ((DEBUG) & DEBUG_I)
1148                                 pkts_total_fragmented++;
1149 #endif
1150                                 flow->flags |= FLOW_FRAG;
1151                                 flow->id = nl->ip_id;
1152
1153                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1154                                         /* Packet whith IP_MF contains information about whole datagram size */
1155                                         flow->flags |= FLOW_LASTFRAG;
1156                                         /* size = frag_offset*8 + data_size */
1157                                         flow->sizeP = off_frag + flow->sizeF;
1158                                 }
1159                         }
1160
1161 #if ((DEBUG) & DEBUG_C)
1162                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1163                         strcat(logbuf, buf);
1164                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1165                         strcat(logbuf, buf);
1166 #endif
1167
1168                         /*
1169                         Fortunately most interesting transport layer information fit
1170                         into first 8 bytes of IP data field (minimal nonzero size).
1171                         Thus we don't need actual packet reassembling to build whole
1172                         transport layer data. We only check the fragment offset for
1173                         zero value to find packet with this information.
1174                         */
1175                         if (!off_frag && psize >= 8) {
1176                                 switch (flow->proto) {
1177                                         case IPPROTO_TCP:
1178                                         case IPPROTO_UDP:
1179                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1180                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1181                                                 goto tl_known;
1182
1183 #ifdef ICMP_TRICK
1184                                         case IPPROTO_ICMP:
1185                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1186                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1187                                                 goto tl_known;
1188 #endif
1189 #ifdef ICMP_TRICK_CISCO
1190                                         case IPPROTO_ICMP:
1191                                                 flow->dp = *((int32_t *) tl);
1192                                                 goto tl_known;
1193 #endif
1194
1195                                         default:
1196                                                 /* Unknown transport layer */
1197 #if ((DEBUG) & DEBUG_C)
1198                                                 strcat(logbuf, " U");
1199 #endif
1200                                                 flow->sp = 0;
1201                                                 flow->dp = 0;
1202                                                 break;
1203
1204                                         tl_known:
1205 #if ((DEBUG) & DEBUG_C)
1206                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1207                                                 strcat(logbuf, buf);
1208 #endif
1209                                                 flow->flags |= FLOW_TL;
1210                                 }
1211                         }
1212
1213                         /* Check for tcp flags presence (including CWR and ECE). */
1214                         if (flow->proto == IPPROTO_TCP
1215                                 && off_frag < 16
1216                                 && psize >= 16 - off_frag) {
1217                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1218 #if ((DEBUG) & DEBUG_C)
1219                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1220                                 strcat(logbuf, buf);
1221 #endif
1222                         }
1223
1224 #if ((DEBUG) & DEBUG_C)
1225                         sprintf(buf, " => %x", (unsigned) flow);
1226                         strcat(logbuf, buf);
1227                         my_log(LOG_DEBUG, "%s", logbuf);
1228 #endif
1229
1230 #if ((DEBUG) & DEBUG_I)
1231                         pkts_pending++;
1232                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1233                         if (pending_queue_trace < pending_queue_trace_candidate)
1234                                 pending_queue_trace = pending_queue_trace_candidate;
1235 #endif
1236
1237                         /* Flow complete - inform unpending_thread() about it */
1238                         pending_head->flags |= FLOW_PENDING;
1239                         pending_head = pending_head->next;
1240                 done:
1241                         pthread_cond_signal(&unpending_cond);
1242                 }
1243         }
1244         return 0;
1245 }
1246
1247 int main(int argc, char **argv)
1248 {
1249         char errpbuf[512];
1250         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1251         int c, i, write_fd, memory_limit = 0;
1252         struct addrinfo hints, *res;
1253         struct sockaddr_in saddr;
1254         pthread_attr_t tattr;
1255         struct sigaction sigact;
1256         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1257         struct timeval timeout;
1258
1259         sched_min = sched_get_priority_min(SCHED);
1260         sched_max = sched_get_priority_max(SCHED);
1261
1262         memset(&saddr, 0 , sizeof(saddr));
1263         memset(&hints, 0 , sizeof(hints));
1264         hints.ai_flags = AI_PASSIVE;
1265         hints.ai_family = AF_INET;
1266         hints.ai_socktype = SOCK_DGRAM;
1267
1268         /* Process command line options */
1269
1270         opterr = 0;
1271         while ((c = my_getopt(argc, argv, parms)) != -1) {
1272                 switch (c) {
1273                         case '?':
1274                                 usage();
1275
1276                         case 'h':
1277                                 usage();
1278                 }
1279         }
1280
1281         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1282         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1283         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1284         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1285         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1286         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1287         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1288         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1289         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1290         if (parms[nflag].count) {
1291                 switch (atoi(parms[nflag].arg)) {
1292                         case 1:
1293                                 netflow = &NetFlow1;
1294                                 break;
1295
1296                         case 5:
1297                                 break;
1298
1299                         case 7:
1300                                 netflow = &NetFlow7;
1301                                 break;
1302
1303                         default:
1304                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1305                                 exit(1);
1306                 }
1307         }
1308         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1309         if (parms[lflag].count) {
1310                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1311                         *log_suffix++ = 0;
1312                         if (*log_suffix) {
1313                                 sprintf(errpbuf, "[%s]", log_suffix);
1314                                 strcat(ident, errpbuf);
1315                         }
1316                 }
1317                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1318                 if (log_suffix) *--log_suffix = ':';
1319         }
1320         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1321         err_malloc:
1322                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1323                 exit(1);
1324         }
1325         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1326         if (parms[qflag].count) {
1327                 pending_queue_length = atoi(parms[qflag].arg);
1328                 if (pending_queue_length < 1) {
1329                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1330                         exit(1);
1331                 }
1332         }
1333         if (parms[rflag].count) {
1334                 schedp.sched_priority = atoi(parms[rflag].arg);
1335                 if (schedp.sched_priority
1336                         && (schedp.sched_priority < sched_min
1337                                 || schedp.sched_priority > sched_max)) {
1338                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1339                         exit(1);
1340                 }
1341         }
1342         if (parms[Bflag].count) {
1343                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1344         }
1345         if (parms[bflag].count) {
1346                 bulk_quantity = atoi(parms[bflag].arg);
1347                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1348                         fprintf(stderr, "Illegal %s\n", "bulk size");
1349                         exit(1);
1350                 }
1351         }
1352         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1353         if (parms[Xflag].count) {
1354                 for(i = 0; parms[Xflag].arg[i]; i++)
1355                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1356                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1357                         goto err_malloc;
1358                 rule = strtok(parms[Xflag].arg, ":");
1359                 for (i = 0; rule; i++) {
1360                         snmp_rules[i].len = strlen(rule);
1361                         if (snmp_rules[i].len > IFNAMSIZ) {
1362                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1363                                 exit(1);
1364                         }
1365                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1366                         if (!*(rule - 1)) *(rule - 1) = ',';
1367                         rule = strtok(NULL, ",");
1368                         if (!rule) {
1369                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1370                                 exit(1);
1371                         }
1372                         snmp_rules[i].base = atoi(rule);
1373                         *(rule - 1) = ':';
1374                         rule = strtok(NULL, ":");
1375                 }
1376                 nsnmp_rules = i;
1377         }
1378         if (parms[tflag].count)
1379                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1380         if (parms[aflag].count) {
1381                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1382                 bad_lhost:
1383                         fprintf(stderr, "Illegal %s\n", "source address");
1384                         exit(1);
1385                 } else {
1386                         saddr = *((struct sockaddr_in *) res->ai_addr);
1387                         freeaddrinfo(res);
1388                 }
1389         }
1390         if (parms[uflag].count) 
1391                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1392                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1393                         exit(1);
1394                 }
1395
1396
1397         /* Process collectors parameters. Brrrr... :-[ */
1398
1399         npeers = argc - optind;
1400         if (npeers > 1) {
1401                 /* Send to remote Netflow collector */
1402                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1403                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1404                         dhost = argv[i];
1405                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1406                         *dport++ = 0;
1407                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1408                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1409                                 exit(1);
1410                         }
1411                         peers[npeers].write_fd = write_fd;
1412                         peers[npeers].type = PEER_MIRROR;
1413                         peers[npeers].laddr = saddr;
1414                         peers[npeers].seq = 0;
1415                         if ((lhost = strchr(dport, '/'))) {
1416                                 *lhost++ = 0;
1417                                 if ((type = strchr(lhost, '/'))) {
1418                                         *type++ = 0;
1419                                         switch (*type) {
1420                                                 case 0:
1421                                                 case 'm':
1422                                                         break;
1423
1424                                                 case 'r':
1425                                                         peers[npeers].type = PEER_ROTATE;
1426                                                         npeers_rot++;
1427                                                         break;
1428
1429                                                 default:
1430                                                         goto bad_collector;
1431                                         }
1432                                 }
1433                                 if (*lhost) {
1434                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1435                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1436                                         freeaddrinfo(res);
1437                                 }
1438                         }
1439                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1440                                                 sizeof(struct sockaddr_in))) {
1441                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1442                                 exit(1);
1443                         }
1444                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1445 bad_collector:
1446                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1447                                 exit(1);
1448                         }
1449                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1450                         freeaddrinfo(res);
1451                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1452                                                 sizeof(struct sockaddr_in))) {
1453                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1454                                 exit(1);
1455                         }
1456
1457                         /* Restore command line */
1458                         if (type) *--type = '/';
1459                         if (lhost) *--lhost = '/';
1460                         *--dport = ':';
1461                 }
1462         }
1463         else if (parms[fflag].count) {
1464                 // log into a file
1465                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1466                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1467                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1468                 
1469                 peers[npeers].write_fd = START_VALUE;
1470                 peers[npeers].type = PEER_FILE;
1471                 peers[npeers].seq = 0;
1472
1473                 get_cur_epoch();
1474                 npeers++;
1475         }
1476         else 
1477                 usage();
1478
1479
1480         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1481         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1482         if (!ulog_handle) {
1483                 fprintf(stderr, "libipulog initialization error: %s",
1484                         ipulog_strerror(ipulog_errno));
1485                 exit(1);
1486         }
1487         if (sockbufsize)
1488                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1489                         &sockbufsize, sizeof(sockbufsize)) < 0)
1490                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1491
1492         /* Daemonize (if log destination stdout-free) */
1493
1494         my_log_open(ident, verbosity, log_dest);
1495         if (!(log_dest & 2)) {
1496                 /* Crash-proofing - Sapan*/
1497                 while (1) {
1498                         int pid=fork();
1499                         if (pid==-1) {
1500                                         fprintf(stderr, "fork(): %s", strerror(errno));
1501                                         exit(1);
1502                         }
1503                         else if (pid==0) {
1504                                         setsid();
1505                                         freopen("/dev/null", "r", stdin);
1506                                         freopen("/dev/null", "w", stdout);
1507                                         freopen("/dev/null", "w", stderr);
1508                                         break;
1509                         }
1510                         else {
1511                                 while (wait3(NULL,0,NULL) < 1);
1512                         }
1513                 }
1514         } else {
1515                 setvbuf(stdout, (char *)0, _IONBF, 0);
1516                 setvbuf(stderr, (char *)0, _IONBF, 0);
1517         }
1518
1519         pid = getpid();
1520         sprintf(errpbuf, "[%ld]", (long) pid);
1521         strcat(ident, errpbuf);
1522
1523         /* Initialization */
1524
1525         hash_init(); /* Actually for crc16 only */
1526         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1527         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1528
1529 #ifdef UPTIME_TRICK
1530         /* Hope 12 days is enough :-/ */
1531         start_time_offset = 1 << 20;
1532
1533         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1534 #endif
1535         gettime(&start_time);
1536
1537         /*
1538         Build static pending queue as circular buffer.
1539         */
1540         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1541         pending_tail = pending_head;
1542         for (i = pending_queue_length - 1; i--;) {
1543                 if (!(pending_tail->next = mem_alloc())) {
1544                 err_mem_alloc:
1545                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1546                         exit(1);
1547                 }
1548                 pending_tail = pending_tail->next;
1549         }
1550         pending_tail->next = pending_head;
1551         pending_tail = pending_head;
1552
1553         sigemptyset(&sig_mask);
1554         sigact.sa_handler = &sighandler;
1555         sigact.sa_mask = sig_mask;
1556         sigact.sa_flags = 0;
1557         sigaddset(&sig_mask, SIGTERM);
1558         sigaction(SIGTERM, &sigact, 0);
1559 #if ((DEBUG) & DEBUG_I)
1560         sigaddset(&sig_mask, SIGUSR1);
1561         sigaction(SIGUSR1, &sigact, 0);
1562 #endif
1563         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1564                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1565                 exit(1);
1566         }
1567
1568         my_log(LOG_INFO, "Starting %s...", VERSION);
1569
1570         if (parms[cflag].count) {
1571                 if (chdir(parms[cflag].arg) || chroot(".")) {
1572                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1573                         exit(1);
1574                 }
1575         }
1576
1577         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1578         pthread_attr_init(&tattr);
1579         for (i = 0; i < THREADS - 1; i++) {
1580                 if (schedp.sched_priority > 0) {
1581                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1582                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1583                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1584                                 exit(1);
1585                         }
1586                 }
1587                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1588                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1589                         exit(1);
1590                 }
1591                 pthread_detach(thid);
1592                 schedp.sched_priority++;
1593         }
1594
1595         if (pw) {
1596                 if (setgroups(0, NULL)) {
1597                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1598                         exit(1);
1599                 }
1600                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1601                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1602                         exit(1);
1603                 }
1604                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1605                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1606                         exit(1);
1607                 }
1608         }
1609
1610         if (!(pidfile = fopen(pidfilepath, "w")))
1611                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1612         else {
1613                 fprintf(pidfile, "%ld\n", (long) pid);
1614                 fclose(pidfile);
1615         }
1616
1617         my_log(LOG_INFO, "pid: %d", pid);
1618         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1619                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1620                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1621                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1622                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1623                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1624                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1625         for (i = 0; i < nsnmp_rules; i++) {
1626                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1627                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1628         }
1629         for (i = 0; i < npeers; i++) {
1630                 switch (peers[i].type) {
1631                         case PEER_MIRROR:
1632                                 c = 'm';
1633                                 break;
1634                         case PEER_ROTATE:
1635                                 c = 'r';
1636                                 break;
1637                 }
1638                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1639                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1640                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1641         }
1642
1643         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1644
1645         timeout.tv_usec = 0;
1646         while (!killed
1647                 || (total_elements - free_elements - pending_queue_length)
1648                 || emit_count
1649                 || pending_tail->flags) {
1650
1651                 if (!sigs) {
1652                         timeout.tv_sec = scan_interval;
1653                         select(0, 0, 0, 0, &timeout);
1654                 }
1655
1656                 if (sigs & SIGTERM_MASK && !killed) {
1657                         sigs &= ~SIGTERM_MASK;
1658                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1659                         scan_interval = 1;
1660                         frag_lifetime = -1;
1661                         active_lifetime = -1;
1662                         inactive_lifetime = -1;
1663                         emit_timeout = 1;
1664                         unpending_timeout = 1;
1665                         killed = 1;
1666                         pthread_cond_signal(&scan_cond);
1667                         pthread_cond_signal(&unpending_cond);
1668                 }
1669
1670 #if ((DEBUG) & DEBUG_I)
1671                 if (sigs & SIGUSR1_MASK) {
1672                         sigs &= ~SIGUSR1_MASK;
1673                         info_debug();
1674                 }
1675 #endif
1676         }
1677         remove(pidfilepath);
1678 #if ((DEBUG) & DEBUG_I)
1679         info_debug();
1680 #endif
1681         my_log(LOG_INFO, "Done.");
1682 #ifdef WALL
1683         return 0;
1684 #endif
1685 }