Fixed bugs that Daniel pointed out.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9         7/11/2007       Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11                 Added data collection (-f) functionality, xid support in the header and log file
12                 rotation.
13 */
14
15 #include <common.h>
16
17 /* stdout, stderr, freopen() */
18 #include <stdio.h>
19
20 /* atoi(), exit() */
21 #include <stdlib.h>
22
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
24 #include <unistd.h>
25
26 /* strerror() */
27 #include <string.h>
28
29 /* sig*() */
30 #include <signal.h>
31
32 /* statfs() */
33
34 #include <sys/statfs.h>
35
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
38         int fd;
39         u_int8_t blocking;
40         struct sockaddr_nl local;
41         struct sockaddr_nl peer;
42         struct nlmsghdr* last_nlhdr;
43 };
44
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
55 #include <net/if.h>
56
57 #include <sys/param.h>
58 #include <pwd.h>
59 #ifdef OS_LINUX
60 #include <grp.h>
61 #endif
62
63 /* pthread_*() */
64 #include <pthread.h>
65
66 /* errno */
67 #include <errno.h>
68
69 /* getaddrinfo() */
70 #include <netdb.h>
71
72 /* nanosleep() */
73 #include <time.h>
74
75 /* gettimeofday() */
76 #include <sys/time.h>
77
78 /* scheduling */
79 #include <sched.h>
80
81 /* select() (POSIX)*/
82 #include <sys/select.h>
83
84 /* open() */
85 #include <sys/stat.h>
86 #include <fcntl.h>
87
88 #include <fprobe-ulog.h>
89 #include <my_log.h>
90 #include <my_getopt.h>
91 #include <netflow.h>
92 #include <hash.h>
93 #include <mem.h>
94
95 enum {
96         aflag,
97         Bflag,
98         bflag,
99         cflag,
100         dflag,
101         Dflag,
102         eflag,
103         Eflag,
104         fflag,
105         gflag,
106         hflag,
107         lflag,
108         mflag,
109         Mflag,
110         nflag,
111         qflag,
112         rflag,
113         sflag,
114         tflag,
115         Tflag,
116         Uflag,
117         uflag,
118         vflag,
119         Wflag,
120         Xflag,
121 };
122
123 static struct getopt_parms parms[] = {
124         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'h', 0, 0, 0},
134         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'M', 0, 0, 0},
137         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {0, 0, 0, 0}
148 };
149
150 extern char *optarg;
151 extern int optind, opterr, optopt;
152 extern int errno;
153
154 extern struct NetFlow NetFlow1;
155 extern struct NetFlow NetFlow5;
156 extern struct NetFlow NetFlow7;
157
158 #define START_VALUE -5
159 #define mark_is_tos parms[Mflag].count
160 static unsigned scan_interval = 5;
161 static int min_free = 0;
162 static int frag_lifetime = 30;
163 static int inactive_lifetime = 60;
164 static int active_lifetime = 300;
165 static int sockbufsize;
166 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
167 #if (MEM_BITS == 0) || (MEM_BITS == 16)
168 #define BULK_QUANTITY 10000
169 #else
170 #define BULK_QUANTITY 200
171 #endif
172
173 static unsigned epoch_length=60, log_epochs=1; 
174 static unsigned cur_epoch=0,prev_uptime=0;
175
176 static unsigned bulk_quantity = BULK_QUANTITY;
177 static unsigned pending_queue_length = 100;
178 static struct NetFlow *netflow = &NetFlow5;
179 static unsigned verbosity = 6;
180 static unsigned log_dest = MY_LOG_SYSLOG;
181 static struct Time start_time;
182 static long start_time_offset;
183 static int off_tl;
184 /* From mem.c */
185 extern unsigned total_elements;
186 extern unsigned free_elements;
187 extern unsigned total_memory;
188 #if ((DEBUG) & DEBUG_I)
189 static unsigned emit_pkts, emit_queue;
190 static uint64_t size_total;
191 static unsigned pkts_total, pkts_total_fragmented;
192 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
193 static unsigned pkts_pending, pkts_pending_done;
194 static unsigned pending_queue_trace, pending_queue_trace_candidate;
195 static unsigned flows_total, flows_fragmented;
196 #endif
197 static unsigned emit_count;
198 static uint32_t emit_sequence;
199 static unsigned emit_rate_bytes, emit_rate_delay;
200 static struct Time emit_time;
201 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
202 static pthread_t thid;
203 static sigset_t sig_mask;
204 static struct sched_param schedp;
205 static int sched_min, sched_max;
206 static int npeers, npeers_rot;
207 static struct peer *peers;
208 static int sigs;
209
210 static struct Flow *flows[1 << HASH_BITS];
211 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
212
213 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
215
216 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
217 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
218 static struct Flow *pending_head, *pending_tail;
219 static struct Flow *scan_frag_dreg;
220
221 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
222 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
223 static struct Flow *flows_emit;
224
225 static char ident[256] = "fprobe-ulog";
226 static FILE *pidfile;
227 static char *pidfilepath;
228 static pid_t pid;
229 static int killed;
230 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
231 static struct ipulog_handle *ulog_handle;
232 static uint32_t ulog_gmask = 1;
233 static char *cap_buf;
234 static int nsnmp_rules;
235 static struct snmp_rule *snmp_rules;
236 static struct passwd *pw = 0;
237
238 void usage()
239 {
240         fprintf(stdout,
241                 "fprobe-ulog: a NetFlow probe. Version %s\n"
242                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
243                 "\n"
244                 "-h\t\tDisplay this help\n"
245                 "-U <mask>\tULOG group bitwise mask [1]\n"
246                 "-s <seconds>\tHow often scan for expired flows [5]\n"
247                 "-g <seconds>\tFragmented flow lifetime [30]\n"
248                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
249                 "-f <filename>\tLog flow data in a file\n"
250                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
251                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
252                 "-a <address>\tUse <address> as source for NetFlow flow\n"
253                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
254                 "-M\t\tUse netfilter mark value as ToS flag\n"
255                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
256                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
257                 "-q <flows>\tPending queue length [100]\n"
258                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
259                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
260                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
261                 "-c <directory>\tDirectory to chroot to\n"
262                 "-u <user>\tUser to run as\n"
263                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
264                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
265                 "-y <remote:port>\tAddress of the NetFlow collector\n"
266                 "-f <writable file>\tFile to write data into\n"
267                 "-T <n>\tRotate log file every n epochs\n"
268                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
269                 "-E <[1..60]>\tSize of an epoch in minutes\n"
270                 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
271                 ,
272                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
273         exit(0);
274 }
275
276 #if ((DEBUG) & DEBUG_I)
277 void info_debug()
278 {
279         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
280                 pkts_total, pkts_total_fragmented, size_total,
281                 pkts_pending - pkts_pending_done, pending_queue_trace);
282         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
283                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
284         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
285                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
286         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
287                 total_elements, free_elements, total_memory);
288 }
289 #endif
290
291 void sighandler(int sig)
292 {
293         switch (sig) {
294                 case SIGTERM:
295                         sigs |= SIGTERM_MASK;
296                         break;
297 #if ((DEBUG) & DEBUG_I)
298                 case SIGUSR1:
299                         sigs |= SIGUSR1_MASK;
300                         break;
301 #endif
302         }
303 }
304
305 void gettime(struct Time *now)
306 {
307         struct timeval t;
308
309         gettimeofday(&t, 0);
310         now->sec = t.tv_sec;
311         now->usec = t.tv_usec;
312 }
313
314
315 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
316 {
317         return (t1->sec - t2->sec)/60;
318 }
319
320 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
321 {
322         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
323 }
324
325 /* Uptime in miliseconds */
326 uint32_t getuptime(struct Time *t)
327 {
328         /* Maximum uptime is about 49/2 days */
329         return cmpmtime(t, &start_time);
330 }
331
332 /* Uptime in minutes */
333 uint32_t getuptime_minutes(struct Time *t)
334 {
335         /* Maximum uptime is about 49/2 days */
336         return cmpMtime(t, &start_time);
337 }
338
339 hash_t hash_flow(struct Flow *flow)
340 {
341         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
342         else return hash(flow, sizeof(struct Flow_TL));
343 }
344
345 uint16_t snmp_index(char *name) {
346         uint32_t i;
347
348         if (!*name) return 0;
349
350         for (i = 0; (int) i < nsnmp_rules; i++) {
351                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
352                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
353         }
354
355         if ((i = if_nametoindex(name))) return i;
356
357         return -1;
358 }
359
360 inline void copy_flow(struct Flow *src, struct Flow *dst)
361 {
362         dst->iif = src->iif;
363         dst->oif = src->oif;
364         dst->sip = src->sip;
365         dst->dip = src->dip;
366         dst->tos = src->tos;
367         dst->proto = src->proto;
368         dst->tcp_flags = src->tcp_flags;
369         dst->id = src->id;
370         dst->sp = src->sp;
371         dst->dp = src->dp;
372         dst->pkts = src->pkts;
373         dst->size = src->size;
374         dst->sizeF = src->sizeF;
375         dst->sizeP = src->sizeP;
376         dst->ctime = src->ctime;
377         dst->mtime = src->mtime;
378         dst->flags = src->flags;
379 }
380
381 void get_cur_epoch() {
382         int fd;
383         fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
384         if (fd != -1) {
385                 char snum[7];
386                 ssize_t len;
387                 len = read(fd, snum, sizeof(snum)-1);
388                 if (len != -1) {
389                         snum[len]='\0';
390                         sscanf(snum,"%d",&cur_epoch);
391                         close(fd);
392                 }
393         }
394         return;
395 }
396
397
398 void update_cur_epoch_file(int n) {
399         int fd, len;
400         char snum[7];
401         len=snprintf(snum,6,"%d",n);
402         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
403         if (fd == -1) {
404                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
405                 return;
406         }
407         write(fd, snum, len);
408         close(fd);
409 }
410
411 unsigned get_log_fd(char *fname, unsigned cur_fd) {
412         struct Time now;
413         unsigned cur_uptime;
414         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
415          * doesn't solve the problem */
416
417         struct statfs statfs;
418         int ret_fd;
419         gettime(&now);
420         cur_uptime = getuptime_minutes(&now);
421
422         if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
423                 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
424         }
425         else {
426                 if (min_free && statfs.f_bfree < min_free) 
427                         switch(cur_epoch) {
428                                 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
429                                         my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
430                                         exit(1);
431                                 default:
432                                         my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
433                                         cur_epoch = -1;
434                         }
435         }
436
437         /* Epoch length in minutes */
438         if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
439                 char nextname[MAX_PATH_LEN];
440                 int write_fd;
441                 prev_uptime = cur_uptime;
442                 cur_epoch = (cur_epoch + 1) % log_epochs;
443                 close(cur_fd);
444                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
445                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
446                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
447                         exit(1);
448                 }
449                 update_cur_epoch_file(cur_epoch);
450                 ret_fd = write_fd;
451         }
452         else
453                 ret_fd = cur_fd;
454         return(ret_fd);
455 }
456
457 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
458 {
459         struct Flow **flowpp;
460
461 #ifdef WALL
462         flowpp = 0;
463 #endif
464
465         if (prev) flowpp = *prev;
466
467         while (where) {
468                 if (where->sip.s_addr == what->sip.s_addr
469                         && where->dip.s_addr == what->dip.s_addr
470                         && where->proto == what->proto) {
471                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
472                                 case 0:
473                                         /* Both unfragmented */
474                                         if ((what->sp == where->sp)
475                                                 && (what->dp == where->dp)) goto done;
476                                         break;
477                                 case 2:
478                                         /* Both fragmented */
479                                         if (where->id == what->id) goto done;
480                                         break;
481                         }
482                 }
483                 flowpp = &where->next;
484                 where = where->next;
485         }
486 done:
487         if (prev) *prev = flowpp;
488         return where;
489 }
490
491 int put_into(struct Flow *flow, int flag
492 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
493         , char *logbuf
494 #endif
495 )
496 {
497         int ret = 0;
498         hash_t h;
499         struct Flow *flown, **flowpp;
500 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
501         char buf[64];
502 #endif
503
504         h = hash_flow(flow);
505 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
506         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
507         strcat(logbuf, buf);
508 #endif
509         pthread_mutex_lock(&flows_mutex[h]);
510         flowpp = &flows[h];
511         if (!(flown = find(flows[h], flow, &flowpp))) {
512                 /* No suitable flow found - add */
513                 if (flag == COPY_INTO) {
514                         if ((flown = mem_alloc())) {
515                                 copy_flow(flow, flown);
516                                 flow = flown;
517                         } else {
518 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
519                                 my_log(LOG_ERR, "%s %s. %s",
520                                         "mem_alloc():", strerror(errno), "packet lost");
521 #endif
522                                 return -1;
523                         }
524                 }
525                 flow->next = flows[h];
526                 flows[h] = flow;
527 #if ((DEBUG) & DEBUG_I)
528                 flows_total++;
529                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
530 #endif
531 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
532                 if (flown) {
533                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
534                         strcat(logbuf, buf);
535                 }
536 #endif
537         } else {
538                 /* Found suitable flow - update */
539 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
540                 sprintf(buf, " +> %x", (unsigned) flown);
541                 strcat(logbuf, buf);
542 #endif
543                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
544                         flown->mtime = flow->mtime;
545                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
546                         flown->ctime = flow->ctime;
547                 flown->tcp_flags |= flow->tcp_flags;
548                 flown->size += flow->size;
549                 flown->pkts += flow->pkts;
550                 if (flow->flags & FLOW_FRAG) {
551                         /* Fragmented flow require some additional work */
552                         if (flow->flags & FLOW_TL) {
553                                 /*
554                                 ?FIXME?
555                                 Several packets with FLOW_TL (attack)
556                                 */
557                                 flown->sp = flow->sp;
558                                 flown->dp = flow->dp;
559                         }
560                         if (flow->flags & FLOW_LASTFRAG) {
561                                 /*
562                                 ?FIXME?
563                                 Several packets with FLOW_LASTFRAG (attack)
564                                 */
565                                 flown->sizeP = flow->sizeP;
566                         }
567                         flown->flags |= flow->flags;
568                         flown->sizeF += flow->sizeF;
569                         if ((flown->flags & FLOW_LASTFRAG)
570                                 && (flown->sizeF >= flown->sizeP)) {
571                                 /* All fragments received - flow reassembled */
572                                 *flowpp = flown->next;
573                                 pthread_mutex_unlock(&flows_mutex[h]);
574 #if ((DEBUG) & DEBUG_I)
575                                 flows_total--;
576                                 flows_fragmented--;
577 #endif
578                                 flown->id = 0;
579                                 flown->flags &= ~FLOW_FRAG;
580 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
581                                 strcat(logbuf," R");
582 #endif
583                                 ret = put_into(flown, MOVE_INTO
584 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
585                                                 , logbuf
586 #endif
587                                         );
588                         }
589                 }
590                 if (flag == MOVE_INTO) mem_free(flow);
591         }
592         pthread_mutex_unlock(&flows_mutex[h]);
593         return ret;
594 }
595
596 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
597 {
598         int i;
599
600         for (i = 0; i < fields; i++) {
601 #if ((DEBUG) & DEBUG_F)
602                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
603 #endif
604                 switch (format[i]) {
605                         case NETFLOW_IPV4_SRC_ADDR:
606                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
607                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
608                                 break;
609
610                         case NETFLOW_IPV4_DST_ADDR:
611                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
612                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
613                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
614                                 }
615                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
616                                 break;
617
618                         case NETFLOW_INPUT_SNMP:
619                                 *((uint16_t *) p) = htons(flow->iif);
620                                 p += NETFLOW_INPUT_SNMP_SIZE;
621                                 break;
622
623                         case NETFLOW_OUTPUT_SNMP:
624                                 *((uint16_t *) p) = htons(flow->oif);
625                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
626                                 break;
627
628                         case NETFLOW_PKTS_32:
629                                 *((uint32_t *) p) = htonl(flow->pkts);
630                                 p += NETFLOW_PKTS_32_SIZE;
631                                 break;
632
633                         case NETFLOW_BYTES_32:
634                                 *((uint32_t *) p) = htonl(flow->size);
635                                 p += NETFLOW_BYTES_32_SIZE;
636                                 break;
637
638                         case NETFLOW_FIRST_SWITCHED:
639                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
640                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
641                                 break;
642
643                         case NETFLOW_LAST_SWITCHED:
644                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
645                                 p += NETFLOW_LAST_SWITCHED_SIZE;
646                                 break;
647
648                         case NETFLOW_L4_SRC_PORT:
649                                 *((uint16_t *) p) = flow->sp;
650                                 p += NETFLOW_L4_SRC_PORT_SIZE;
651                                 break;
652
653                         case NETFLOW_L4_DST_PORT:
654                                 *((uint16_t *) p) = flow->dp;
655                                 p += NETFLOW_L4_DST_PORT_SIZE;
656                                 break;
657
658                         case NETFLOW_PROT:
659                                 *((uint8_t *) p) = flow->proto;
660                                 p += NETFLOW_PROT_SIZE;
661                                 break;
662
663                         case NETFLOW_SRC_TOS:
664                                 *((uint8_t *) p) = flow->tos;
665                                 p += NETFLOW_SRC_TOS_SIZE;
666                                 break;
667
668                         case NETFLOW_TCP_FLAGS:
669                                 *((uint8_t *) p) = flow->tcp_flags;
670                                 p += NETFLOW_TCP_FLAGS_SIZE;
671                                 break;
672
673                         case NETFLOW_VERSION:
674                                 *((uint16_t *) p) = htons(netflow->Version);
675                                 p += NETFLOW_VERSION_SIZE;
676                                 break;
677
678                         case NETFLOW_COUNT:
679                                 *((uint16_t *) p) = htons(emit_count);
680                                 p += NETFLOW_COUNT_SIZE;
681                                 break;
682
683                         case NETFLOW_UPTIME:
684                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
685                                 p += NETFLOW_UPTIME_SIZE;
686                                 break;
687
688                         case NETFLOW_UNIX_SECS:
689                                 *((uint32_t *) p) = htonl(emit_time.sec);
690                                 p += NETFLOW_UNIX_SECS_SIZE;
691                                 break;
692
693                         case NETFLOW_UNIX_NSECS:
694                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
695                                 p += NETFLOW_UNIX_NSECS_SIZE;
696                                 break;
697
698                         case NETFLOW_FLOW_SEQUENCE:
699                                 //*((uint32_t *) p) = htonl(emit_sequence);
700                                 *((uint32_t *) p) = 0;
701                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
702                                 break;
703
704                         case NETFLOW_PAD8:
705                         /* Unsupported (uint8_t) */
706                         case NETFLOW_ENGINE_TYPE:
707                         case NETFLOW_ENGINE_ID:
708                         case NETFLOW_FLAGS7_1:
709                         case NETFLOW_SRC_MASK:
710                         case NETFLOW_DST_MASK:
711                                 *((uint8_t *) p) = 0;
712                                 p += NETFLOW_PAD8_SIZE;
713                                 break;
714                         case NETFLOW_XID:
715                                 *((uint16_t *) p) = flow->tos;
716                                 p += NETFLOW_XID_SIZE;
717                                 break;
718                         case NETFLOW_PAD16:
719                         /* Unsupported (uint16_t) */
720                         case NETFLOW_SRC_AS:
721                         case NETFLOW_DST_AS:
722                         case NETFLOW_FLAGS7_2:
723                                 *((uint16_t *) p) = 0;
724                                 p += NETFLOW_PAD16_SIZE;
725                                 break;
726
727                         case NETFLOW_PAD32:
728                         /* Unsupported (uint32_t) */
729                         case NETFLOW_IPV4_NEXT_HOP:
730                         case NETFLOW_ROUTER_SC:
731                                 *((uint32_t *) p) = 0;
732                                 p += NETFLOW_PAD32_SIZE;
733                                 break;
734
735                         default:
736                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
737                                         format, i, format[i]);
738                                 exit(1);
739                 }
740         }
741 #if ((DEBUG) & DEBUG_F)
742         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
743 #endif
744         return p;
745 }
746
747 void setuser() {
748         /*
749         Workaround for clone()-based threads
750         Try to change EUID independently of main thread
751         */
752         if (pw) {
753                 setgroups(0, NULL);
754                 setregid(pw->pw_gid, pw->pw_gid);
755                 setreuid(pw->pw_uid, pw->pw_uid);
756         }
757 }
758
759 void *emit_thread()
760 {
761         struct Flow *flow;
762         void *p;
763         struct timeval now;
764         struct timespec timeout;
765         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
766
767         p = (void *) &emit_packet + netflow->HeaderSize;
768         timeout.tv_nsec = 0;
769
770         setuser();
771
772         for (;;) {
773                 pthread_mutex_lock(&emit_mutex);
774                 while (!flows_emit) {
775                         gettimeofday(&now, 0);
776                         timeout.tv_sec = now.tv_sec + emit_timeout;
777                         /* Do not wait until emit_packet will filled - it may be too long */
778                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
779                                 pthread_mutex_unlock(&emit_mutex);
780                                 goto sendit;
781                         }
782                 }
783                 flow = flows_emit;
784                 flows_emit = flows_emit->next;
785 #if ((DEBUG) & DEBUG_I)
786                 emit_queue--;
787 #endif          
788                 pthread_mutex_unlock(&emit_mutex);
789
790 #ifdef UPTIME_TRICK
791                 if (!emit_count) {
792                         gettime(&start_time);
793                         start_time.sec -= start_time_offset;
794                 }
795 #endif
796                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
797                 mem_free(flow);
798                 emit_count++;
799 #ifdef PF2_DEBUG
800                 printf("Emit count = %d\n", emit_count);
801                 fflush(stdout);
802 #endif
803                 if (emit_count == netflow->MaxFlows) {
804                 sendit:
805                         gettime(&emit_time);
806                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
807                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
808                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
809                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
810                         peer_rot_cur = 0;
811                         for (i = 0; i < npeers; i++) {
812                                 if (peers[i].type == PEER_FILE) {
813                                                 if (netflow->SeqOffset)
814                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
815                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
816                                                 ret = write(peers[i].write_fd, emit_packet, size);
817                                                 if (ret < size) {
818
819 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
820                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
821                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
822 #endif
823 #undef MESSAGES
824                                                 }
825 #if ((DEBUG) & DEBUG_E)
826                                                 commaneelse {
827                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
828                                                                 emit_count, i + 1, peers[i].seq);
829                                                 }
830 #endif
831                                                 peers[i].seq += emit_count;
832
833                                                 /* Rate limit */
834                                                 if (emit_rate_bytes) {
835                                                         sent += size;
836                                                         delay = sent / emit_rate_bytes;
837                                                         if (delay) {
838                                                                 sent %= emit_rate_bytes;
839                                                                 timeout.tv_sec = 0;
840                                                                 timeout.tv_nsec = emit_rate_delay * delay;
841                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
842                                                         }
843                                                 }
844                                         }
845                                 else
846                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
847                                 else
848                                 if (peers[i].type == PEER_ROTATE) 
849                                         if (peer_rot_cur++ == peer_rot_work) {
850                                         sendreal:
851                                                 if (netflow->SeqOffset)
852                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
853                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
854                                                 if (ret < size) {
855 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
856                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
857                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
858 #endif
859                                                 }
860 #if ((DEBUG) & DEBUG_E)
861                                                 commaneelse {
862                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
863                                                                 emit_count, i + 1, peers[i].seq);
864                                                 }
865 #endif
866                                                 peers[i].seq += emit_count;
867
868                                                 /* Rate limit */
869                                                 if (emit_rate_bytes) {
870                                                         sent += size;
871                                                         delay = sent / emit_rate_bytes;
872                                                         if (delay) {
873                                                                 sent %= emit_rate_bytes;
874                                                                 timeout.tv_sec = 0;
875                                                                 timeout.tv_nsec = emit_rate_delay * delay;
876                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
877                                                         }
878                                                 }
879                                         }
880                         }
881                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
882                         emit_sequence += emit_count;
883                         emit_count = 0;
884 #if ((DEBUG) & DEBUG_I)
885                         emit_pkts++;
886 #endif
887                 }
888         }
889 }       
890
891 void *unpending_thread()
892 {
893         struct timeval now;
894         struct timespec timeout;
895 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
896         char logbuf[256];
897 #endif
898
899         setuser();
900
901         timeout.tv_nsec = 0;
902         pthread_mutex_lock(&unpending_mutex);
903
904         for (;;) {
905                 while (!(pending_tail->flags & FLOW_PENDING)) {
906                         gettimeofday(&now, 0);
907                         timeout.tv_sec = now.tv_sec + unpending_timeout;
908                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
909                 }
910
911 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
912                 *logbuf = 0;
913 #endif
914                 if (put_into(pending_tail, COPY_INTO
915 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
916                                 , logbuf
917 #endif
918                         ) < 0) {
919 #if ((DEBUG) & DEBUG_I)
920                         pkts_lost_unpending++;
921 #endif                          
922                 }
923
924 #if ((DEBUG) & DEBUG_U)
925                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
926 #endif
927
928                 pending_tail->flags = 0;
929                 pending_tail = pending_tail->next;
930 #if ((DEBUG) & DEBUG_I)
931                 pkts_pending_done++;
932 #endif
933         }
934 }
935
936 void *scan_thread()
937 {
938 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
939         char logbuf[256];
940 #endif
941         int i;
942         struct Flow *flow, **flowpp;
943         struct Time now;
944         struct timespec timeout;
945
946         setuser();
947
948         timeout.tv_nsec = 0;
949         pthread_mutex_lock(&scan_mutex);
950
951         for (;;) {
952                 gettime(&now);
953                 timeout.tv_sec = now.sec + scan_interval;
954                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
955
956                 gettime(&now);
957 #if ((DEBUG) & DEBUG_S)
958                 my_log(LOG_DEBUG, "S: %d", now.sec);
959 #endif
960                 for (i = 0; i < 1 << HASH_BITS ; i++) {
961                         pthread_mutex_lock(&flows_mutex[i]);
962                         flow = flows[i];
963                         flowpp = &flows[i];
964                         while (flow) {
965                                 if (flow->flags & FLOW_FRAG) {
966                                         /* Process fragmented flow */
967                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
968                                                 /* Fragmented flow expired - put it into special chain */
969 #if ((DEBUG) & DEBUG_I)
970                                                 flows_fragmented--;
971                                                 flows_total--;
972 #endif
973                                                 *flowpp = flow->next;
974                                                 flow->id = 0;
975                                                 flow->flags &= ~FLOW_FRAG;
976                                                 flow->next = scan_frag_dreg;
977                                                 scan_frag_dreg = flow;
978                                                 flow = *flowpp;
979                                                 continue;
980                                         }
981                                 } else {
982                                         /* Flow is not frgamented */
983                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
984                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
985                                                 /* Flow expired */
986 #if ((DEBUG) & DEBUG_S)
987                                                 my_log(LOG_DEBUG, "S: E %x", flow);
988 #endif
989 #if ((DEBUG) & DEBUG_I)
990                                                 flows_total--;
991 #endif
992                                                 *flowpp = flow->next;
993                                                 pthread_mutex_lock(&emit_mutex);
994                                                 flow->next = flows_emit;
995                                                 flows_emit = flow;
996 #if ((DEBUG) & DEBUG_I)
997                                                 emit_queue++;
998 #endif                          
999                                                 pthread_mutex_unlock(&emit_mutex);
1000                                                 flow = *flowpp;
1001                                                 continue;
1002                                         }
1003                                 }
1004                                 flowpp = &flow->next;
1005                                 flow = flow->next;
1006                         } /* chain loop */
1007                         pthread_mutex_unlock(&flows_mutex[i]);
1008                 } /* hash loop */
1009                 if (flows_emit) pthread_cond_signal(&emit_cond);
1010
1011                 while (scan_frag_dreg) {
1012                         flow = scan_frag_dreg;
1013                         scan_frag_dreg = flow->next;
1014 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1015                         *logbuf = 0;
1016 #endif
1017                         put_into(flow, MOVE_INTO
1018 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1019                                 , logbuf
1020 #endif
1021                         );
1022 #if ((DEBUG) & DEBUG_S)
1023                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1024 #endif
1025                 }
1026         }
1027 }
1028
1029 void *cap_thread()
1030 {
1031         struct ulog_packet_msg *ulog_msg;
1032         struct ip *nl;
1033         void *tl;
1034         struct Flow *flow;
1035         int len, off_frag, psize;
1036 #if ((DEBUG) & DEBUG_C)
1037         char buf[64];
1038         char logbuf[256];
1039 #endif
1040
1041         setuser();
1042
1043         while (!killed) {
1044                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1045                 if (len <= 0) {
1046                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1047                         continue;
1048                 }
1049                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1050
1051 #if ((DEBUG) & DEBUG_C)
1052                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1053 #endif
1054
1055                         nl = (void *) &ulog_msg->payload;
1056                         psize = ulog_msg->data_len;
1057
1058                         /* Sanity check */
1059                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1060 #if ((DEBUG) & DEBUG_C)
1061                                 strcat(logbuf, " U");
1062                                 my_log(LOG_DEBUG, "%s", logbuf);
1063 #endif
1064 #if ((DEBUG) & DEBUG_I)
1065                                 pkts_ignored++;
1066 #endif
1067                                 continue;
1068                         }
1069
1070                         if (pending_head->flags) {
1071 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1072                                 my_log(LOG_ERR,
1073 # if ((DEBUG) & DEBUG_C)
1074                                         "%s %s %s", logbuf,
1075 # else
1076                                         "%s %s",
1077 # endif
1078                                         "pending queue full:", "packet lost");
1079 #endif
1080 #if ((DEBUG) & DEBUG_I)
1081                                 pkts_lost_capture++;
1082 #endif
1083                                 goto done;
1084                         }
1085
1086 #if ((DEBUG) & DEBUG_I)
1087                         pkts_total++;
1088 #endif
1089
1090                         flow = pending_head;
1091
1092                         /* ?FIXME? Add sanity check for ip_len? */
1093                         flow->size = ntohs(nl->ip_len);
1094 #if ((DEBUG) & DEBUG_I)
1095                         size_total += flow->size;
1096 #endif
1097
1098                         flow->sip = nl->ip_src;
1099                         flow->dip = nl->ip_dst;
1100                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1101                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1102                         }
1103                         flow->iif = snmp_index(ulog_msg->indev_name);
1104                         flow->oif = snmp_index(ulog_msg->outdev_name);
1105                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1106                         flow->proto = nl->ip_p;
1107                         flow->id = 0;
1108                         flow->tcp_flags = 0;
1109                         flow->pkts = 1;
1110                         flow->sizeF = 0;
1111                         flow->sizeP = 0;
1112                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1113                         if (ulog_msg->timestamp_sec) {
1114                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1115                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1116                         } else gettime(&flow->ctime);
1117                         flow->mtime = flow->ctime;
1118
1119                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1120
1121                         /*
1122                         Offset (from network layer) to transport layer header/IP data
1123                         IOW IP header size ;-)
1124
1125                         ?FIXME?
1126                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1127                         */
1128                         off_tl = nl->ip_hl << 2;
1129                         tl = (void *) nl + off_tl;
1130
1131                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1132                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1133                         psize -= off_tl;
1134                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1135                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1136
1137                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1138                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1139 #if ((DEBUG) & DEBUG_C)
1140                                 strcat(logbuf, " F");
1141 #endif
1142 #if ((DEBUG) & DEBUG_I)
1143                                 pkts_total_fragmented++;
1144 #endif
1145                                 flow->flags |= FLOW_FRAG;
1146                                 flow->id = nl->ip_id;
1147
1148                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1149                                         /* Packet whith IP_MF contains information about whole datagram size */
1150                                         flow->flags |= FLOW_LASTFRAG;
1151                                         /* size = frag_offset*8 + data_size */
1152                                         flow->sizeP = off_frag + flow->sizeF;
1153                                 }
1154                         }
1155
1156 #if ((DEBUG) & DEBUG_C)
1157                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1158                         strcat(logbuf, buf);
1159                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1160                         strcat(logbuf, buf);
1161 #endif
1162
1163                         /*
1164                         Fortunately most interesting transport layer information fit
1165                         into first 8 bytes of IP data field (minimal nonzero size).
1166                         Thus we don't need actual packet reassembling to build whole
1167                         transport layer data. We only check the fragment offset for
1168                         zero value to find packet with this information.
1169                         */
1170                         if (!off_frag && psize >= 8) {
1171                                 switch (flow->proto) {
1172                                         case IPPROTO_TCP:
1173                                         case IPPROTO_UDP:
1174                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1175                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1176                                                 goto tl_known;
1177
1178 #ifdef ICMP_TRICK
1179                                         case IPPROTO_ICMP:
1180                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1181                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1182                                                 goto tl_known;
1183 #endif
1184 #ifdef ICMP_TRICK_CISCO
1185                                         case IPPROTO_ICMP:
1186                                                 flow->dp = *((int32_t *) tl);
1187                                                 goto tl_known;
1188 #endif
1189
1190                                         default:
1191                                                 /* Unknown transport layer */
1192 #if ((DEBUG) & DEBUG_C)
1193                                                 strcat(logbuf, " U");
1194 #endif
1195                                                 flow->sp = 0;
1196                                                 flow->dp = 0;
1197                                                 break;
1198
1199                                         tl_known:
1200 #if ((DEBUG) & DEBUG_C)
1201                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1202                                                 strcat(logbuf, buf);
1203 #endif
1204                                                 flow->flags |= FLOW_TL;
1205                                 }
1206                         }
1207
1208                         /* Check for tcp flags presence (including CWR and ECE). */
1209                         if (flow->proto == IPPROTO_TCP
1210                                 && off_frag < 16
1211                                 && psize >= 16 - off_frag) {
1212                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1213 #if ((DEBUG) & DEBUG_C)
1214                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1215                                 strcat(logbuf, buf);
1216 #endif
1217                         }
1218
1219 #if ((DEBUG) & DEBUG_C)
1220                         sprintf(buf, " => %x", (unsigned) flow);
1221                         strcat(logbuf, buf);
1222                         my_log(LOG_DEBUG, "%s", logbuf);
1223 #endif
1224
1225 #if ((DEBUG) & DEBUG_I)
1226                         pkts_pending++;
1227                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1228                         if (pending_queue_trace < pending_queue_trace_candidate)
1229                                 pending_queue_trace = pending_queue_trace_candidate;
1230 #endif
1231
1232                         /* Flow complete - inform unpending_thread() about it */
1233                         pending_head->flags |= FLOW_PENDING;
1234                         pending_head = pending_head->next;
1235                 done:
1236                         pthread_cond_signal(&unpending_cond);
1237                 }
1238         }
1239         return 0;
1240 }
1241
1242 int main(int argc, char **argv)
1243 {
1244         char errpbuf[512];
1245         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1246         int c, i, write_fd, memory_limit = 0;
1247         struct addrinfo hints, *res;
1248         struct sockaddr_in saddr;
1249         pthread_attr_t tattr;
1250         struct sigaction sigact;
1251         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1252         struct timeval timeout;
1253
1254         sched_min = sched_get_priority_min(SCHED);
1255         sched_max = sched_get_priority_max(SCHED);
1256
1257         memset(&saddr, 0 , sizeof(saddr));
1258         memset(&hints, 0 , sizeof(hints));
1259         hints.ai_flags = AI_PASSIVE;
1260         hints.ai_family = AF_INET;
1261         hints.ai_socktype = SOCK_DGRAM;
1262
1263         /* Process command line options */
1264
1265         opterr = 0;
1266         while ((c = my_getopt(argc, argv, parms)) != -1) {
1267                 switch (c) {
1268                         case '?':
1269                                 usage();
1270
1271                         case 'h':
1272                                 usage();
1273                 }
1274         }
1275
1276         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1277         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1278         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1279         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1280         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1281         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1282         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1283         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1284         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1285         if (parms[nflag].count) {
1286                 switch (atoi(parms[nflag].arg)) {
1287                         case 1:
1288                                 netflow = &NetFlow1;
1289                                 break;
1290
1291                         case 5:
1292                                 break;
1293
1294                         case 7:
1295                                 netflow = &NetFlow7;
1296                                 break;
1297
1298                         default:
1299                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1300                                 exit(1);
1301                 }
1302         }
1303         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1304         if (parms[lflag].count) {
1305                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1306                         *log_suffix++ = 0;
1307                         if (*log_suffix) {
1308                                 sprintf(errpbuf, "[%s]", log_suffix);
1309                                 strcat(ident, errpbuf);
1310                         }
1311                 }
1312                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1313                 if (log_suffix) *--log_suffix = ':';
1314         }
1315         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1316         err_malloc:
1317                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1318                 exit(1);
1319         }
1320         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1321         if (parms[qflag].count) {
1322                 pending_queue_length = atoi(parms[qflag].arg);
1323                 if (pending_queue_length < 1) {
1324                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1325                         exit(1);
1326                 }
1327         }
1328         if (parms[rflag].count) {
1329                 schedp.sched_priority = atoi(parms[rflag].arg);
1330                 if (schedp.sched_priority
1331                         && (schedp.sched_priority < sched_min
1332                                 || schedp.sched_priority > sched_max)) {
1333                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1334                         exit(1);
1335                 }
1336         }
1337         if (parms[Bflag].count) {
1338                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1339         }
1340         if (parms[bflag].count) {
1341                 bulk_quantity = atoi(parms[bflag].arg);
1342                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1343                         fprintf(stderr, "Illegal %s\n", "bulk size");
1344                         exit(1);
1345                 }
1346         }
1347         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1348         if (parms[Xflag].count) {
1349                 for(i = 0; parms[Xflag].arg[i]; i++)
1350                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1351                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1352                         goto err_malloc;
1353                 rule = strtok(parms[Xflag].arg, ":");
1354                 for (i = 0; rule; i++) {
1355                         snmp_rules[i].len = strlen(rule);
1356                         if (snmp_rules[i].len > IFNAMSIZ) {
1357                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1358                                 exit(1);
1359                         }
1360                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1361                         if (!*(rule - 1)) *(rule - 1) = ',';
1362                         rule = strtok(NULL, ",");
1363                         if (!rule) {
1364                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1365                                 exit(1);
1366                         }
1367                         snmp_rules[i].base = atoi(rule);
1368                         *(rule - 1) = ':';
1369                         rule = strtok(NULL, ":");
1370                 }
1371                 nsnmp_rules = i;
1372         }
1373         if (parms[tflag].count)
1374                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1375         if (parms[aflag].count) {
1376                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1377                 bad_lhost:
1378                         fprintf(stderr, "Illegal %s\n", "source address");
1379                         exit(1);
1380                 } else {
1381                         saddr = *((struct sockaddr_in *) res->ai_addr);
1382                         freeaddrinfo(res);
1383                 }
1384         }
1385         if (parms[uflag].count) 
1386                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1387                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1388                         exit(1);
1389                 }
1390
1391
1392         /* Process collectors parameters. Brrrr... :-[ */
1393
1394         npeers = argc - optind;
1395         if (npeers > 1) {
1396                 /* Send to remote Netflow collector */
1397                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1398                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1399                         dhost = argv[i];
1400                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1401                         *dport++ = 0;
1402                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1403                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1404                                 exit(1);
1405                         }
1406                         peers[npeers].write_fd = write_fd;
1407                         peers[npeers].type = PEER_MIRROR;
1408                         peers[npeers].laddr = saddr;
1409                         peers[npeers].seq = 0;
1410                         if ((lhost = strchr(dport, '/'))) {
1411                                 *lhost++ = 0;
1412                                 if ((type = strchr(lhost, '/'))) {
1413                                         *type++ = 0;
1414                                         switch (*type) {
1415                                                 case 0:
1416                                                 case 'm':
1417                                                         break;
1418
1419                                                 case 'r':
1420                                                         peers[npeers].type = PEER_ROTATE;
1421                                                         npeers_rot++;
1422                                                         break;
1423
1424                                                 default:
1425                                                         goto bad_collector;
1426                                         }
1427                                 }
1428                                 if (*lhost) {
1429                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1430                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1431                                         freeaddrinfo(res);
1432                                 }
1433                         }
1434                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1435                                                 sizeof(struct sockaddr_in))) {
1436                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1437                                 exit(1);
1438                         }
1439                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1440 bad_collector:
1441                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1442                                 exit(1);
1443                         }
1444                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1445                         freeaddrinfo(res);
1446                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1447                                                 sizeof(struct sockaddr_in))) {
1448                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1449                                 exit(1);
1450                         }
1451
1452                         /* Restore command line */
1453                         if (type) *--type = '/';
1454                         if (lhost) *--lhost = '/';
1455                         *--dport = ':';
1456                 }
1457         }
1458         else if (parms[fflag].count) {
1459                 // log into a file
1460                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1461                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1462                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1463                 
1464                 peers[npeers].write_fd = START_VALUE;
1465                 peers[npeers].type = PEER_FILE;
1466                 peers[npeers].seq = 0;
1467
1468                 get_cur_epoch();
1469                 npeers++;
1470         }
1471         else 
1472                 usage();
1473
1474
1475         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1476         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1477         if (!ulog_handle) {
1478                 fprintf(stderr, "libipulog initialization error: %s",
1479                         ipulog_strerror(ipulog_errno));
1480                 exit(1);
1481         }
1482         if (sockbufsize)
1483                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1484                         &sockbufsize, sizeof(sockbufsize)) < 0)
1485                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1486
1487         /* Daemonize (if log destination stdout-free) */
1488
1489         my_log_open(ident, verbosity, log_dest);
1490         if (!(log_dest & 2)) {
1491                 /* Crash-proofing - Sapan*/
1492                 while (1) {
1493                         int pid=fork();
1494                         if (pid==-1) {
1495                                         fprintf(stderr, "fork(): %s", strerror(errno));
1496                                         exit(1);
1497                         }
1498                         else if (pid==0) {
1499                                         setsid();
1500                                         freopen("/dev/null", "r", stdin);
1501                                         freopen("/dev/null", "w", stdout);
1502                                         freopen("/dev/null", "w", stderr);
1503                                         break;
1504                         }
1505                         else {
1506                                 while (wait3(NULL,0,NULL) < 1);
1507                         }
1508                 }
1509         } else {
1510                 setvbuf(stdout, (char *)0, _IONBF, 0);
1511                 setvbuf(stderr, (char *)0, _IONBF, 0);
1512         }
1513
1514         pid = getpid();
1515         sprintf(errpbuf, "[%ld]", (long) pid);
1516         strcat(ident, errpbuf);
1517
1518         /* Initialization */
1519
1520         hash_init(); /* Actually for crc16 only */
1521         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1522         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1523
1524 #ifdef UPTIME_TRICK
1525         /* Hope 12 days is enough :-/ */
1526         start_time_offset = 1 << 20;
1527
1528         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1529 #endif
1530         gettime(&start_time);
1531
1532         /*
1533         Build static pending queue as circular buffer.
1534         */
1535         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1536         pending_tail = pending_head;
1537         for (i = pending_queue_length - 1; i--;) {
1538                 if (!(pending_tail->next = mem_alloc())) {
1539                 err_mem_alloc:
1540                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1541                         exit(1);
1542                 }
1543                 pending_tail = pending_tail->next;
1544         }
1545         pending_tail->next = pending_head;
1546         pending_tail = pending_head;
1547
1548         sigemptyset(&sig_mask);
1549         sigact.sa_handler = &sighandler;
1550         sigact.sa_mask = sig_mask;
1551         sigact.sa_flags = 0;
1552         sigaddset(&sig_mask, SIGTERM);
1553         sigaction(SIGTERM, &sigact, 0);
1554 #if ((DEBUG) & DEBUG_I)
1555         sigaddset(&sig_mask, SIGUSR1);
1556         sigaction(SIGUSR1, &sigact, 0);
1557 #endif
1558         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1559                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1560                 exit(1);
1561         }
1562
1563         my_log(LOG_INFO, "Starting %s...", VERSION);
1564
1565         if (parms[cflag].count) {
1566                 if (chdir(parms[cflag].arg) || chroot(".")) {
1567                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1568                         exit(1);
1569                 }
1570         }
1571
1572         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1573         pthread_attr_init(&tattr);
1574         for (i = 0; i < THREADS - 1; i++) {
1575                 if (schedp.sched_priority > 0) {
1576                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1577                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1578                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1579                                 exit(1);
1580                         }
1581                 }
1582                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1583                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1584                         exit(1);
1585                 }
1586                 pthread_detach(thid);
1587                 schedp.sched_priority++;
1588         }
1589
1590         if (pw) {
1591                 if (setgroups(0, NULL)) {
1592                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1593                         exit(1);
1594                 }
1595                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1596                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1597                         exit(1);
1598                 }
1599                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1600                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1601                         exit(1);
1602                 }
1603         }
1604
1605         if (!(pidfile = fopen(pidfilepath, "w")))
1606                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1607         else {
1608                 fprintf(pidfile, "%ld\n", (long) pid);
1609                 fclose(pidfile);
1610         }
1611
1612         my_log(LOG_INFO, "pid: %d", pid);
1613         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1614                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1615                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1616                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1617                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1618                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1619                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1620         for (i = 0; i < nsnmp_rules; i++) {
1621                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1622                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1623         }
1624         for (i = 0; i < npeers; i++) {
1625                 switch (peers[i].type) {
1626                         case PEER_MIRROR:
1627                                 c = 'm';
1628                                 break;
1629                         case PEER_ROTATE:
1630                                 c = 'r';
1631                                 break;
1632                 }
1633                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1634                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1635                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1636         }
1637
1638         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1639
1640         timeout.tv_usec = 0;
1641         while (!killed
1642                 || (total_elements - free_elements - pending_queue_length)
1643                 || emit_count
1644                 || pending_tail->flags) {
1645
1646                 if (!sigs) {
1647                         timeout.tv_sec = scan_interval;
1648                         select(0, 0, 0, 0, &timeout);
1649                 }
1650
1651                 if (sigs & SIGTERM_MASK && !killed) {
1652                         sigs &= ~SIGTERM_MASK;
1653                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1654                         scan_interval = 1;
1655                         frag_lifetime = -1;
1656                         active_lifetime = -1;
1657                         inactive_lifetime = -1;
1658                         emit_timeout = 1;
1659                         unpending_timeout = 1;
1660                         killed = 1;
1661                         pthread_cond_signal(&scan_cond);
1662                         pthread_cond_signal(&unpending_cond);
1663                 }
1664
1665 #if ((DEBUG) & DEBUG_I)
1666                 if (sigs & SIGUSR1_MASK) {
1667                         sigs &= ~SIGUSR1_MASK;
1668                         info_debug();
1669                 }
1670 #endif
1671         }
1672         remove(pidfilepath);
1673 #if ((DEBUG) & DEBUG_I)
1674         info_debug();
1675 #endif
1676         my_log(LOG_INFO, "Done.");
1677 #ifdef WALL
1678         return 0;
1679 #endif
1680 }