Decrease granularity to reduce the size of the dataset
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
41         int fd;
42         u_int8_t blocking;
43         struct sockaddr_nl local;
44         struct sockaddr_nl peer;
45         struct nlmsghdr* last_nlhdr;
46 };
47
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
58 #include <net/if.h>
59
60 #include <sys/param.h>
61 #include <pwd.h>
62 #ifdef OS_LINUX
63 #include <grp.h>
64 #endif
65
66 /* pthread_*() */
67 #include <pthread.h>
68
69 /* errno */
70 #include <errno.h>
71
72 /* getaddrinfo() */
73 #include <netdb.h>
74
75 /* nanosleep() */
76 #include <time.h>
77
78 /* gettimeofday() */
79 #include <sys/time.h>
80
81 /* scheduling */
82 #include <sched.h>
83
84 /* select() (POSIX)*/
85 #include <sys/select.h>
86
87 /* open() */
88 #include <sys/stat.h>
89 #include <fcntl.h>
90
91 #include <fprobe-ulog.h>
92 #include <my_log.h>
93 #include <my_getopt.h>
94 #include <netflow.h>
95 #include <hash.h>
96 #include <mem.h>
97
98 #define PIDFILE "/var/log/fprobe-ulog.pid"
99 #define STD_NETFLOW_PDU
100
101 enum {
102         aflag,
103         Bflag,
104         bflag,
105         cflag,
106         dflag,
107         Dflag,
108         eflag,
109         Eflag,
110         fflag,
111         gflag,
112         hflag,
113         lflag,
114         mflag,
115         Mflag,
116         nflag,
117         qflag,
118         rflag,
119         sflag,
120         tflag,
121         Tflag,
122         Uflag,
123         uflag,
124         vflag,
125         Wflag,
126         Xflag,
127 };
128
129 static struct getopt_parms parms[] = {
130         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'h', 0, 0, 0},
141         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'M', 0, 0, 0},
144         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
148         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {0, 0, 0, 0}
155 };
156
157 extern char *optarg;
158 extern int optind, opterr, optopt;
159 extern int errno;
160
161 extern struct NetFlow NetFlow1;
162 extern struct NetFlow NetFlow5;
163 extern struct NetFlow NetFlow7;
164
165 #define START_VALUE -5
166 #define mark_is_tos parms[Mflag].count
167 static unsigned scan_interval = 5;
168 static int min_free = 0;
169 static int frag_lifetime = 30;
170 static int inactive_lifetime = 60;
171 static int active_lifetime = 300;
172 static int sockbufsize;
173 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
174 #if (MEM_BITS == 0) || (MEM_BITS == 16)
175 #define BULK_QUANTITY 10000
176 #else
177 #define BULK_QUANTITY 200
178 #endif
179
180 static unsigned epoch_length=60, log_epochs=1; 
181 static unsigned cur_epoch=0,prev_uptime=0;
182
183 static unsigned bulk_quantity = BULK_QUANTITY;
184 static unsigned pending_queue_length = 100;
185 static struct NetFlow *netflow = &NetFlow5;
186 static unsigned verbosity = 6;
187 static unsigned log_dest = MY_LOG_SYSLOG;
188 static struct Time start_time;
189 static long start_time_offset;
190 static int off_tl;
191 /* From mem.c */
192 extern unsigned total_elements;
193 extern unsigned free_elements;
194 extern unsigned total_memory;
195 #if ((DEBUG) & DEBUG_I)
196 static unsigned emit_pkts, emit_queue;
197 static uint64_t size_total;
198 static unsigned pkts_total, pkts_total_fragmented;
199 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
200 static unsigned pkts_pending, pkts_pending_done;
201 static unsigned pending_queue_trace, pending_queue_trace_candidate;
202 static unsigned flows_total, flows_fragmented;
203 #endif
204 static unsigned emit_count;
205 static uint32_t emit_sequence;
206 static unsigned emit_rate_bytes, emit_rate_delay;
207 static struct Time emit_time;
208 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
209 static pthread_t thid;
210 static sigset_t sig_mask;
211 static struct sched_param schedp;
212 static int sched_min, sched_max;
213 static int npeers, npeers_rot;
214 static struct peer *peers;
215 static int sigs;
216
217 static struct Flow *flows[1 << HASH_BITS];
218 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
219
220 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
221 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
222
223 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
224 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
225 static struct Flow *pending_head, *pending_tail;
226 static struct Flow *scan_frag_dreg;
227
228 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *flows_emit;
231
232 static char ident[256] = "fprobe-ulog";
233 static FILE *pidfile;
234 static char *pidfilepath;
235 static pid_t pid;
236 static int killed;
237 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
238 static struct ipulog_handle *ulog_handle;
239 static uint32_t ulog_gmask = 1;
240 static char *cap_buf;
241 static int nsnmp_rules;
242 static struct snmp_rule *snmp_rules;
243 static struct passwd *pw = 0;
244
245 void usage()
246 {
247         fprintf(stdout,
248                 "fprobe-ulog: a NetFlow probe. Version %s\n"
249                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
250                 "\n"
251                 "-h\t\tDisplay this help\n"
252                 "-U <mask>\tULOG group bitwise mask [1]\n"
253                 "-s <seconds>\tHow often scan for expired flows [5]\n"
254                 "-g <seconds>\tFragmented flow lifetime [30]\n"
255                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
256                 "-f <filename>\tLog flow data in a file\n"
257                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
258                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
259                 "-a <address>\tUse <address> as source for NetFlow flow\n"
260                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
261                 "-M\t\tUse netfilter mark value as ToS flag\n"
262                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
263                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
264                 "-q <flows>\tPending queue length [100]\n"
265                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
266                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
267                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
268                 "-c <directory>\tDirectory to chroot to\n"
269                 "-u <user>\tUser to run as\n"
270                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
271                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
272                 "-y <remote:port>\tAddress of the NetFlow collector\n"
273                 "-f <writable file>\tFile to write data into\n"
274                 "-T <n>\tRotate log file every n epochs\n"
275                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
276                 "-E <[1..60]>\tSize of an epoch in minutes\n"
277                 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
278                 ,
279                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
280         exit(0);
281 }
282
283 #if ((DEBUG) & DEBUG_I)
284 void info_debug()
285 {
286         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
287                 pkts_total, pkts_total_fragmented, size_total,
288                 pkts_pending - pkts_pending_done, pending_queue_trace);
289         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
290                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
291         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
292                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
293         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
294                 total_elements, free_elements, total_memory);
295 }
296 #endif
297
298 void sighandler(int sig)
299 {
300         switch (sig) {
301                 case SIGTERM:
302                         sigs |= SIGTERM_MASK;
303                         break;
304 #if ((DEBUG) & DEBUG_I)
305                 case SIGUSR1:
306                         sigs |= SIGUSR1_MASK;
307                         break;
308 #endif
309         }
310 }
311
312 void gettime(struct Time *now)
313 {
314         struct timeval t;
315
316         gettimeofday(&t, 0);
317         now->sec = t.tv_sec;
318         now->usec = t.tv_usec;
319 }
320
321
322 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
323 {
324         return (t1->sec - t2->sec)/60;
325 }
326
327 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
328 {
329         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
330 }
331
332 /* Uptime in miliseconds */
333 uint32_t getuptime(struct Time *t)
334 {
335         /* Maximum uptime is about 49/2 days */
336         return cmpmtime(t, &start_time);
337 }
338
339 /* Uptime in minutes */
340 uint32_t getuptime_minutes(struct Time *t)
341 {
342         /* Maximum uptime is about 49/2 days */
343         return cmpMtime(t, &start_time);
344 }
345
346 hash_t hash_flow(struct Flow *flow)
347 {
348         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
349         else return hash(flow, sizeof(struct Flow_TL));
350 }
351
352 uint16_t snmp_index(char *name) {
353         uint32_t i;
354
355         if (!*name) return 0;
356
357         for (i = 0; (int) i < nsnmp_rules; i++) {
358                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
359                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
360         }
361
362         if ((i = if_nametoindex(name))) return i;
363
364         return -1;
365 }
366
367 inline void copy_flow(struct Flow *src, struct Flow *dst)
368 {
369         dst->iif = src->iif;
370         dst->oif = src->oif;
371         dst->sip = src->sip;
372         dst->dip = src->dip;
373         dst->tos = src->tos;
374         dst->proto = src->proto;
375         dst->tcp_flags = src->tcp_flags;
376         dst->id = src->id;
377         dst->sp = src->sp;
378         dst->dp = src->dp;
379         dst->pkts = src->pkts;
380         dst->size = src->size;
381         dst->sizeF = src->sizeF;
382         dst->sizeP = src->sizeP;
383         dst->ctime = src->ctime;
384         dst->mtime = src->mtime;
385         dst->flags = src->flags;
386 }
387
388 void get_cur_epoch() {
389         int fd;
390         fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
391         if (fd != -1) {
392                 char snum[7];
393                 ssize_t len;
394                 len = read(fd, snum, sizeof(snum)-1);
395                 if (len != -1) {
396                         snum[len]='\0';
397                         sscanf(snum,"%d",&cur_epoch);
398                         cur_epoch++; /* Let's not stone the last epoch */
399                         close(fd);
400                 }
401         }
402         return;
403 }
404
405
406 void update_cur_epoch_file(int n) {
407         int fd, len;
408         char snum[7];
409         len=snprintf(snum,6,"%d",n);
410         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
411         if (fd == -1) {
412                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
413                 return;
414         }
415         write(fd, snum, len);
416         close(fd);
417 }
418
419 unsigned get_log_fd(char *fname, int cur_fd) {
420         struct Time now;
421         unsigned cur_uptime;
422         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
423          * doesn't solve the problem */
424
425         struct statfs statfs;
426         int ret_fd;
427         gettime(&now);
428         cur_uptime = getuptime_minutes(&now);
429
430
431         if (cur_fd!=START_VALUE) {
432                 if (fstatfs(cur_fd, &statfs) == -1) {
433                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
434                 }
435                 else {
436                         if (min_free && (statfs.f_bavail < min_free)) 
437                                 switch(cur_epoch) {
438                                         case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
439                                                 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
440                                                 exit(1);
441                                         default:
442                                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
443                                                 cur_epoch = -1;
444                                 }
445                 }
446         }
447
448         /* Epoch length in minutes */
449         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
450                 char nextname[MAX_PATH_LEN];
451                 int write_fd;
452                 prev_uptime = cur_uptime;
453                 cur_epoch = (cur_epoch + 1) % log_epochs;
454                 if (cur_fd>0)
455                         close(cur_fd);
456                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
457                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
458                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
459                         exit(1);
460                 }
461                 update_cur_epoch_file(cur_epoch);
462                 ret_fd = write_fd;
463         }
464         else {
465                 ret_fd = cur_fd;
466         }
467         return(ret_fd);
468 }
469
470 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
471 {
472         struct Flow **flowpp;
473
474 #ifdef WALL
475         flowpp = 0;
476 #endif
477
478         if (prev) flowpp = *prev;
479
480         while (where) {
481                 if (where->sip.s_addr == what->sip.s_addr
482                         && where->dip.s_addr == what->dip.s_addr
483                         && where->proto == what->proto) {
484                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
485                                 case 0:
486                                         /* Both unfragmented */
487                                         if ((what->sp == where->sp)
488                                                 && (what->dp == where->dp)) goto done;
489                                         break;
490                                 case 2:
491                                         /* Both fragmented */
492                                         if (where->id == what->id) goto done;
493                                         break;
494                         }
495                 }
496                 flowpp = &where->next;
497                 where = where->next;
498         }
499 done:
500         if (prev) *prev = flowpp;
501         return where;
502 }
503
504 int put_into(struct Flow *flow, int flag
505 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
506         , char *logbuf
507 #endif
508 )
509 {
510         int ret = 0;
511         hash_t h;
512         struct Flow *flown, **flowpp;
513 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
514         char buf[64];
515 #endif
516
517         h = hash_flow(flow);
518 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
519         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
520         strcat(logbuf, buf);
521 #endif
522         pthread_mutex_lock(&flows_mutex[h]);
523         flowpp = &flows[h];
524         if (!(flown = find(flows[h], flow, &flowpp))) {
525                 /* No suitable flow found - add */
526                 if (flag == COPY_INTO) {
527                         if ((flown = mem_alloc())) {
528                                 copy_flow(flow, flown);
529                                 flow = flown;
530                         } else {
531 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
532                                 my_log(LOG_ERR, "%s %s. %s",
533                                         "mem_alloc():", strerror(errno), "packet lost");
534 #endif
535                                 return -1;
536                         }
537                 }
538                 flow->next = flows[h];
539                 flows[h] = flow;
540 #if ((DEBUG) & DEBUG_I)
541                 flows_total++;
542                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
543 #endif
544 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
545                 if (flown) {
546                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
547                         strcat(logbuf, buf);
548                 }
549 #endif
550         } else {
551                 /* Found suitable flow - update */
552 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
553                 sprintf(buf, " +> %x", (unsigned) flown);
554                 strcat(logbuf, buf);
555 #endif
556                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
557                         flown->mtime = flow->mtime;
558                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
559                         flown->ctime = flow->ctime;
560                 flown->tcp_flags |= flow->tcp_flags;
561                 flown->size += flow->size;
562                 flown->pkts += flow->pkts;
563                 if (flow->flags & FLOW_FRAG) {
564                         /* Fragmented flow require some additional work */
565                         if (flow->flags & FLOW_TL) {
566                                 /*
567                                 ?FIXME?
568                                 Several packets with FLOW_TL (attack)
569                                 */
570                                 flown->sp = flow->sp;
571                                 flown->dp = flow->dp;
572                         }
573                         if (flow->flags & FLOW_LASTFRAG) {
574                                 /*
575                                 ?FIXME?
576                                 Several packets with FLOW_LASTFRAG (attack)
577                                 */
578                                 flown->sizeP = flow->sizeP;
579                         }
580                         flown->flags |= flow->flags;
581                         flown->sizeF += flow->sizeF;
582                         if ((flown->flags & FLOW_LASTFRAG)
583                                 && (flown->sizeF >= flown->sizeP)) {
584                                 /* All fragments received - flow reassembled */
585                                 *flowpp = flown->next;
586                                 pthread_mutex_unlock(&flows_mutex[h]);
587 #if ((DEBUG) & DEBUG_I)
588                                 flows_total--;
589                                 flows_fragmented--;
590 #endif
591                                 flown->id = 0;
592                                 flown->flags &= ~FLOW_FRAG;
593 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
594                                 strcat(logbuf," R");
595 #endif
596                                 ret = put_into(flown, MOVE_INTO
597 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
598                                                 , logbuf
599 #endif
600                                         );
601                         }
602                 }
603                 if (flag == MOVE_INTO) mem_free(flow);
604         }
605         pthread_mutex_unlock(&flows_mutex[h]);
606         return ret;
607 }
608
609 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
610 {
611         int i;
612
613         for (i = 0; i < fields; i++) {
614 #if ((DEBUG) & DEBUG_F)
615                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
616 #endif
617                 switch (format[i]) {
618                         case NETFLOW_IPV4_SRC_ADDR:
619                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
620                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
621                                 break;
622
623                         case NETFLOW_IPV4_DST_ADDR:
624                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
625                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
626                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
627                                 }
628                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
629                                 break;
630
631                         case NETFLOW_INPUT_SNMP:
632                                 *((uint16_t *) p) = htons(flow->iif);
633                                 p += NETFLOW_INPUT_SNMP_SIZE;
634                                 break;
635
636                         case NETFLOW_OUTPUT_SNMP:
637                                 *((uint16_t *) p) = htons(flow->oif);
638                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
639                                 break;
640
641                         case NETFLOW_PKTS_32:
642                                 *((uint32_t *) p) = htonl(flow->pkts);
643                                 p += NETFLOW_PKTS_32_SIZE;
644                                 break;
645
646                         case NETFLOW_BYTES_32:
647                                 *((uint32_t *) p) = htonl(flow->size);
648                                 p += NETFLOW_BYTES_32_SIZE;
649                                 break;
650
651                         case NETFLOW_FIRST_SWITCHED:
652                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
653                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
654                                 break;
655
656                         case NETFLOW_LAST_SWITCHED:
657                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
658                                 p += NETFLOW_LAST_SWITCHED_SIZE;
659                                 break;
660
661                         case NETFLOW_L4_SRC_PORT:
662                                 *((uint16_t *) p) = flow->sp;
663                                 p += NETFLOW_L4_SRC_PORT_SIZE;
664                                 break;
665
666                         case NETFLOW_L4_DST_PORT:
667                                 *((uint16_t *) p) = flow->dp;
668                                 p += NETFLOW_L4_DST_PORT_SIZE;
669                                 break;
670
671                         case NETFLOW_PROT:
672                                 *((uint8_t *) p) = flow->proto;
673                                 p += NETFLOW_PROT_SIZE;
674                                 break;
675
676                         case NETFLOW_SRC_TOS:
677                                 *((uint8_t *) p) = flow->tos;
678                                 p += NETFLOW_SRC_TOS_SIZE;
679                                 break;
680
681                         case NETFLOW_TCP_FLAGS:
682                                 *((uint8_t *) p) = flow->tcp_flags;
683                                 p += NETFLOW_TCP_FLAGS_SIZE;
684                                 break;
685
686                         case NETFLOW_VERSION:
687                                 *((uint16_t *) p) = htons(netflow->Version);
688                                 p += NETFLOW_VERSION_SIZE;
689                                 break;
690
691                         case NETFLOW_COUNT:
692                                 *((uint16_t *) p) = htons(emit_count);
693                                 p += NETFLOW_COUNT_SIZE;
694                                 break;
695
696                         case NETFLOW_UPTIME:
697                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
698                                 p += NETFLOW_UPTIME_SIZE;
699                                 break;
700
701                         case NETFLOW_UNIX_SECS:
702                                 *((uint32_t *) p) = htonl(emit_time.sec);
703                                 p += NETFLOW_UNIX_SECS_SIZE;
704                                 break;
705
706                         case NETFLOW_UNIX_NSECS:
707                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
708                                 p += NETFLOW_UNIX_NSECS_SIZE;
709                                 break;
710
711                         case NETFLOW_FLOW_SEQUENCE:
712                                 //*((uint32_t *) p) = htonl(emit_sequence);
713                                 *((uint32_t *) p) = 0;
714                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
715                                 break;
716
717                         case NETFLOW_PAD8:
718                         /* Unsupported (uint8_t) */
719                         case NETFLOW_ENGINE_TYPE:
720                         case NETFLOW_ENGINE_ID:
721                         case NETFLOW_FLAGS7_1:
722                         case NETFLOW_SRC_MASK:
723                         case NETFLOW_DST_MASK:
724                                 *((uint8_t *) p) = 0;
725                                 p += NETFLOW_PAD8_SIZE;
726                                 break;
727                         case NETFLOW_XID:
728                                 *((uint16_t *) p) = flow->tos;
729                                 p += NETFLOW_XID_SIZE;
730                                 break;
731                         case NETFLOW_PAD16:
732                         /* Unsupported (uint16_t) */
733                         case NETFLOW_SRC_AS:
734                         case NETFLOW_DST_AS:
735                         case NETFLOW_FLAGS7_2:
736                                 *((uint16_t *) p) = 0;
737                                 p += NETFLOW_PAD16_SIZE;
738                                 break;
739
740                         case NETFLOW_PAD32:
741                         /* Unsupported (uint32_t) */
742                         case NETFLOW_IPV4_NEXT_HOP:
743                         case NETFLOW_ROUTER_SC:
744                                 *((uint32_t *) p) = 0;
745                                 p += NETFLOW_PAD32_SIZE;
746                                 break;
747
748                         default:
749                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
750                                         format, i, format[i]);
751                                 exit(1);
752                 }
753         }
754 #if ((DEBUG) & DEBUG_F)
755         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
756 #endif
757         return p;
758 }
759
760 void setuser() {
761         /*
762         Workaround for clone()-based threads
763         Try to change EUID independently of main thread
764         */
765         if (pw) {
766                 setgroups(0, NULL);
767                 setregid(pw->pw_gid, pw->pw_gid);
768                 setreuid(pw->pw_uid, pw->pw_uid);
769         }
770 }
771
772 void *emit_thread()
773 {
774         struct Flow *flow;
775         void *p;
776         struct timeval now;
777         struct timespec timeout;
778         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
779
780         p = (void *) &emit_packet + netflow->HeaderSize;
781         timeout.tv_nsec = 0;
782
783         setuser();
784
785         for (;;) {
786                 pthread_mutex_lock(&emit_mutex);
787                 while (!flows_emit) {
788                         gettimeofday(&now, 0);
789                         timeout.tv_sec = now.tv_sec + emit_timeout;
790                         /* Do not wait until emit_packet will filled - it may be too long */
791                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
792                                 pthread_mutex_unlock(&emit_mutex);
793                                 goto sendit;
794                         }
795                 }
796                 flow = flows_emit;
797                 flows_emit = flows_emit->next;
798 #if ((DEBUG) & DEBUG_I)
799                 emit_queue--;
800 #endif          
801                 pthread_mutex_unlock(&emit_mutex);
802
803 #ifdef UPTIME_TRICK
804                 if (!emit_count) {
805                         gettime(&start_time);
806                         start_time.sec -= start_time_offset;
807                 }
808 #endif
809                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
810                 mem_free(flow);
811                 emit_count++;
812 #ifdef PF2_DEBUG
813                 printf("Emit count = %d\n", emit_count);
814                 fflush(stdout);
815 #endif
816                 if (emit_count == netflow->MaxFlows) {
817                 sendit:
818                         gettime(&emit_time);
819                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
820                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
821                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
822 #ifdef STD_NETFLOW_PDU
823                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
824 #endif
825                         peer_rot_cur = 0;
826                         for (i = 0; i < npeers; i++) {
827                                 if (peers[i].type == PEER_FILE) {
828                                                 if (netflow->SeqOffset)
829                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
830                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
831                                                 ret = write(peers[i].write_fd, emit_packet, size);
832                                                 if (ret < size) {
833
834 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
835                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
836                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
837 #endif
838 #undef MESSAGES
839                                                 }
840 #if ((DEBUG) & DEBUG_E)
841                                                 commaneelse {
842                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
843                                                                 emit_count, i + 1, peers[i].seq);
844                                                 }
845 #endif
846                                                 peers[i].seq += emit_count;
847
848                                                 /* Rate limit */
849                                                 if (emit_rate_bytes) {
850                                                         sent += size;
851                                                         delay = sent / emit_rate_bytes;
852                                                         if (delay) {
853                                                                 sent %= emit_rate_bytes;
854                                                                 timeout.tv_sec = 0;
855                                                                 timeout.tv_nsec = emit_rate_delay * delay;
856                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
857                                                         }
858                                                 }
859                                         }
860                                 else
861                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
862                                 else
863                                 if (peers[i].type == PEER_ROTATE) 
864                                         if (peer_rot_cur++ == peer_rot_work) {
865                                         sendreal:
866                                                 if (netflow->SeqOffset)
867                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
868                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
869                                                 if (ret < size) {
870 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
871                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
872                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
873 #endif
874                                                 }
875 #if ((DEBUG) & DEBUG_E)
876                                                 commaneelse {
877                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
878                                                                 emit_count, i + 1, peers[i].seq);
879                                                 }
880 #endif
881                                                 peers[i].seq += emit_count;
882
883                                                 /* Rate limit */
884                                                 if (emit_rate_bytes) {
885                                                         sent += size;
886                                                         delay = sent / emit_rate_bytes;
887                                                         if (delay) {
888                                                                 sent %= emit_rate_bytes;
889                                                                 timeout.tv_sec = 0;
890                                                                 timeout.tv_nsec = emit_rate_delay * delay;
891                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
892                                                         }
893                                                 }
894                                         }
895                         }
896                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
897                         emit_sequence += emit_count;
898                         emit_count = 0;
899 #if ((DEBUG) & DEBUG_I)
900                         emit_pkts++;
901 #endif
902                 }
903         }
904 }       
905
906 void *unpending_thread()
907 {
908         struct timeval now;
909         struct timespec timeout;
910 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
911         char logbuf[256];
912 #endif
913
914         setuser();
915
916         timeout.tv_nsec = 0;
917         pthread_mutex_lock(&unpending_mutex);
918
919         for (;;) {
920                 while (!(pending_tail->flags & FLOW_PENDING)) {
921                         gettimeofday(&now, 0);
922                         timeout.tv_sec = now.tv_sec + unpending_timeout;
923                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
924                 }
925
926 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
927                 *logbuf = 0;
928 #endif
929                 if (put_into(pending_tail, COPY_INTO
930 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
931                                 , logbuf
932 #endif
933                         ) < 0) {
934 #if ((DEBUG) & DEBUG_I)
935                         pkts_lost_unpending++;
936 #endif                          
937                 }
938
939 #if ((DEBUG) & DEBUG_U)
940                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
941 #endif
942
943                 pending_tail->flags = 0;
944                 pending_tail = pending_tail->next;
945 #if ((DEBUG) & DEBUG_I)
946                 pkts_pending_done++;
947 #endif
948         }
949 }
950
951 void *scan_thread()
952 {
953 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
954         char logbuf[256];
955 #endif
956         int i;
957         struct Flow *flow, **flowpp;
958         struct Time now;
959         struct timespec timeout;
960
961         setuser();
962
963         timeout.tv_nsec = 0;
964         pthread_mutex_lock(&scan_mutex);
965
966         for (;;) {
967                 gettime(&now);
968                 timeout.tv_sec = now.sec + scan_interval;
969                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
970
971                 gettime(&now);
972 #if ((DEBUG) & DEBUG_S)
973                 my_log(LOG_DEBUG, "S: %d", now.sec);
974 #endif
975                 for (i = 0; i < 1 << HASH_BITS ; i++) {
976                         pthread_mutex_lock(&flows_mutex[i]);
977                         flow = flows[i];
978                         flowpp = &flows[i];
979                         while (flow) {
980                                 if (flow->flags & FLOW_FRAG) {
981                                         /* Process fragmented flow */
982                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
983                                                 /* Fragmented flow expired - put it into special chain */
984 #if ((DEBUG) & DEBUG_I)
985                                                 flows_fragmented--;
986                                                 flows_total--;
987 #endif
988                                                 *flowpp = flow->next;
989                                                 flow->id = 0;
990                                                 flow->flags &= ~FLOW_FRAG;
991                                                 flow->next = scan_frag_dreg;
992                                                 scan_frag_dreg = flow;
993                                                 flow = *flowpp;
994                                                 continue;
995                                         }
996                                 } else {
997                                         /* Flow is not frgamented */
998                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
999                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1000                                                 /* Flow expired */
1001 #if ((DEBUG) & DEBUG_S)
1002                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1003 #endif
1004 #if ((DEBUG) & DEBUG_I)
1005                                                 flows_total--;
1006 #endif
1007                                                 *flowpp = flow->next;
1008                                                 pthread_mutex_lock(&emit_mutex);
1009                                                 flow->next = flows_emit;
1010                                                 flows_emit = flow;
1011 #if ((DEBUG) & DEBUG_I)
1012                                                 emit_queue++;
1013 #endif                          
1014                                                 pthread_mutex_unlock(&emit_mutex);
1015                                                 flow = *flowpp;
1016                                                 continue;
1017                                         }
1018                                 }
1019                                 flowpp = &flow->next;
1020                                 flow = flow->next;
1021                         } /* chain loop */
1022                         pthread_mutex_unlock(&flows_mutex[i]);
1023                 } /* hash loop */
1024                 if (flows_emit) pthread_cond_signal(&emit_cond);
1025
1026                 while (scan_frag_dreg) {
1027                         flow = scan_frag_dreg;
1028                         scan_frag_dreg = flow->next;
1029 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1030                         *logbuf = 0;
1031 #endif
1032                         put_into(flow, MOVE_INTO
1033 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1034                                 , logbuf
1035 #endif
1036                         );
1037 #if ((DEBUG) & DEBUG_S)
1038                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1039 #endif
1040                 }
1041         }
1042 }
1043
1044 void *cap_thread()
1045 {
1046         struct ulog_packet_msg *ulog_msg;
1047         struct ip *nl;
1048         void *tl;
1049         struct Flow *flow;
1050         int len, off_frag, psize;
1051 #if ((DEBUG) & DEBUG_C)
1052         char buf[64];
1053         char logbuf[256];
1054 #endif
1055
1056         setuser();
1057
1058         while (!killed) {
1059                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1060                 if (len <= 0) {
1061                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1062                         continue;
1063                 }
1064                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1065
1066 #if ((DEBUG) & DEBUG_C)
1067                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1068 #endif
1069
1070                         nl = (void *) &ulog_msg->payload;
1071                         psize = ulog_msg->data_len;
1072
1073                         /* Sanity check */
1074                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1075 #if ((DEBUG) & DEBUG_C)
1076                                 strcat(logbuf, " U");
1077                                 my_log(LOG_DEBUG, "%s", logbuf);
1078 #endif
1079 #if ((DEBUG) & DEBUG_I)
1080                                 pkts_ignored++;
1081 #endif
1082                                 continue;
1083                         }
1084
1085                         if (pending_head->flags) {
1086 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1087                                 my_log(LOG_ERR,
1088 # if ((DEBUG) & DEBUG_C)
1089                                         "%s %s %s", logbuf,
1090 # else
1091                                         "%s %s",
1092 # endif
1093                                         "pending queue full:", "packet lost");
1094 #endif
1095 #if ((DEBUG) & DEBUG_I)
1096                                 pkts_lost_capture++;
1097 #endif
1098                                 goto done;
1099                         }
1100
1101 #if ((DEBUG) & DEBUG_I)
1102                         pkts_total++;
1103 #endif
1104
1105                         flow = pending_head;
1106
1107                         /* ?FIXME? Add sanity check for ip_len? */
1108                         flow->size = ntohs(nl->ip_len);
1109 #if ((DEBUG) & DEBUG_I)
1110                         size_total += flow->size;
1111 #endif
1112
1113                         flow->sip = nl->ip_src;
1114                         flow->dip = nl->ip_dst;
1115                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1116                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1117                         }
1118                         flow->iif = snmp_index(ulog_msg->indev_name);
1119                         flow->oif = snmp_index(ulog_msg->outdev_name);
1120                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1121                         flow->proto = nl->ip_p;
1122                         flow->id = 0;
1123                         flow->tcp_flags = 0;
1124                         flow->pkts = 1;
1125                         flow->sizeF = 0;
1126                         flow->sizeP = 0;
1127                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1128                         if (ulog_msg->timestamp_sec) {
1129                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1130                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1131                         } else gettime(&flow->ctime);
1132                         flow->mtime = flow->ctime;
1133
1134                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1135
1136                         /*
1137                         Offset (from network layer) to transport layer header/IP data
1138                         IOW IP header size ;-)
1139
1140                         ?FIXME?
1141                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1142                         */
1143                         off_tl = nl->ip_hl << 2;
1144                         tl = (void *) nl + off_tl;
1145
1146                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1147                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1148                         psize -= off_tl;
1149                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1150                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1151
1152                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1153                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1154 #if ((DEBUG) & DEBUG_C)
1155                                 strcat(logbuf, " F");
1156 #endif
1157 #if ((DEBUG) & DEBUG_I)
1158                                 pkts_total_fragmented++;
1159 #endif
1160                                 flow->flags |= FLOW_FRAG;
1161                                 flow->id = nl->ip_id;
1162
1163                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1164                                         /* Packet whith IP_MF contains information about whole datagram size */
1165                                         flow->flags |= FLOW_LASTFRAG;
1166                                         /* size = frag_offset*8 + data_size */
1167                                         flow->sizeP = off_frag + flow->sizeF;
1168                                 }
1169                         }
1170
1171 #if ((DEBUG) & DEBUG_C)
1172                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1173                         strcat(logbuf, buf);
1174                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1175                         strcat(logbuf, buf);
1176 #endif
1177
1178                         /*
1179                         Fortunately most interesting transport layer information fit
1180                         into first 8 bytes of IP data field (minimal nonzero size).
1181                         Thus we don't need actual packet reassembling to build whole
1182                         transport layer data. We only check the fragment offset for
1183                         zero value to find packet with this information.
1184                         */
1185                         if (!off_frag && psize >= 8) {
1186                                 switch (flow->proto) {
1187                                         case IPPROTO_TCP:
1188                                         case IPPROTO_UDP:
1189                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1190                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1191                                                 goto tl_known;
1192
1193 #ifdef ICMP_TRICK
1194                                         case IPPROTO_ICMP:
1195                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1196                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1197                                                 goto tl_known;
1198 #endif
1199 #ifdef ICMP_TRICK_CISCO
1200                                         case IPPROTO_ICMP:
1201                                                 flow->dp = *((int32_t *) tl);
1202                                                 goto tl_known;
1203 #endif
1204
1205                                         default:
1206                                                 /* Unknown transport layer */
1207 #if ((DEBUG) & DEBUG_C)
1208                                                 strcat(logbuf, " U");
1209 #endif
1210                                                 flow->sp = 0;
1211                                                 flow->dp = 0;
1212                                                 break;
1213
1214                                         tl_known:
1215 #if ((DEBUG) & DEBUG_C)
1216                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1217                                                 strcat(logbuf, buf);
1218 #endif
1219                                                 flow->flags |= FLOW_TL;
1220                                 }
1221                         }
1222
1223                         /* Check for tcp flags presence (including CWR and ECE). */
1224                         if (flow->proto == IPPROTO_TCP
1225                                 && off_frag < 16
1226                                 && psize >= 16 - off_frag) {
1227                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1228 #if ((DEBUG) & DEBUG_C)
1229                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1230                                 strcat(logbuf, buf);
1231 #endif
1232                         }
1233
1234 #if ((DEBUG) & DEBUG_C)
1235                         sprintf(buf, " => %x", (unsigned) flow);
1236                         strcat(logbuf, buf);
1237                         my_log(LOG_DEBUG, "%s", logbuf);
1238 #endif
1239
1240 #if ((DEBUG) & DEBUG_I)
1241                         pkts_pending++;
1242                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1243                         if (pending_queue_trace < pending_queue_trace_candidate)
1244                                 pending_queue_trace = pending_queue_trace_candidate;
1245 #endif
1246
1247                         /* Flow complete - inform unpending_thread() about it */
1248                         pending_head->flags |= FLOW_PENDING;
1249                         pending_head = pending_head->next;
1250                 done:
1251                         pthread_cond_signal(&unpending_cond);
1252                 }
1253         }
1254         return 0;
1255 }
1256
1257 /* Copied out of CoDemux */
1258
1259 static int init_daemon() {
1260   pid_t pid;
1261   FILE *pidfile;
1262
1263   pidfile = fopen(PIDFILE, "w");
1264   if (pidfile == NULL) {
1265     my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1266   }
1267
1268   if ((pid = fork()) < 0) {
1269     fclose(pidfile);
1270     my_log(LOG_ERR, "Could not fork!\n");
1271     return(-1);
1272   }
1273   else if (pid != 0) {
1274     /* i'm the parent, writing down the child pid  */
1275     fprintf(pidfile, "%u\n", pid);
1276     fclose(pidfile);
1277     exit(0);
1278   }
1279
1280   /* close the pid file */
1281   fclose(pidfile);
1282
1283   /* routines for any daemon process
1284      1. create a new session 
1285      2. change directory to the root
1286      3. change the file creation permission 
1287   */
1288   setsid();
1289   chdir("/usr/local/fprobe");
1290   umask(0);
1291
1292   return(0);
1293 }
1294
1295 int main(int argc, char **argv)
1296 {
1297         char errpbuf[512];
1298         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1299         int c, i, write_fd, memory_limit = 0;
1300         struct addrinfo hints, *res;
1301         struct sockaddr_in saddr;
1302         pthread_attr_t tattr;
1303         struct sigaction sigact;
1304         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1305         struct timeval timeout;
1306
1307         sched_min = sched_get_priority_min(SCHED);
1308         sched_max = sched_get_priority_max(SCHED);
1309
1310         memset(&saddr, 0 , sizeof(saddr));
1311         memset(&hints, 0 , sizeof(hints));
1312         hints.ai_flags = AI_PASSIVE;
1313         hints.ai_family = AF_INET;
1314         hints.ai_socktype = SOCK_DGRAM;
1315
1316         /* Process command line options */
1317
1318         opterr = 0;
1319         while ((c = my_getopt(argc, argv, parms)) != -1) {
1320                 switch (c) {
1321                         case '?':
1322                                 usage();
1323
1324                         case 'h':
1325                                 usage();
1326                 }
1327         }
1328
1329         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1330         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1331         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1332         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1333         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1334         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1335         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1336         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1337         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1338         if (parms[nflag].count) {
1339                 switch (atoi(parms[nflag].arg)) {
1340                         case 1:
1341                                 netflow = &NetFlow1;
1342                                 break;
1343
1344                         case 5:
1345                                 break;
1346
1347                         case 7:
1348                                 netflow = &NetFlow7;
1349                                 break;
1350
1351                         default:
1352                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1353                                 exit(1);
1354                 }
1355         }
1356         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1357         if (parms[lflag].count) {
1358                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1359                         *log_suffix++ = 0;
1360                         if (*log_suffix) {
1361                                 sprintf(errpbuf, "[%s]", log_suffix);
1362                                 strcat(ident, errpbuf);
1363                         }
1364                 }
1365                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1366                 if (log_suffix) *--log_suffix = ':';
1367         }
1368         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1369         err_malloc:
1370                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1371                 exit(1);
1372         }
1373         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1374         if (parms[qflag].count) {
1375                 pending_queue_length = atoi(parms[qflag].arg);
1376                 if (pending_queue_length < 1) {
1377                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1378                         exit(1);
1379                 }
1380         }
1381         if (parms[rflag].count) {
1382                 schedp.sched_priority = atoi(parms[rflag].arg);
1383                 if (schedp.sched_priority
1384                         && (schedp.sched_priority < sched_min
1385                                 || schedp.sched_priority > sched_max)) {
1386                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1387                         exit(1);
1388                 }
1389         }
1390         if (parms[Bflag].count) {
1391                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1392         }
1393         if (parms[bflag].count) {
1394                 bulk_quantity = atoi(parms[bflag].arg);
1395                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1396                         fprintf(stderr, "Illegal %s\n", "bulk size");
1397                         exit(1);
1398                 }
1399         }
1400         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1401         if (parms[Xflag].count) {
1402                 for(i = 0; parms[Xflag].arg[i]; i++)
1403                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1404                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1405                         goto err_malloc;
1406                 rule = strtok(parms[Xflag].arg, ":");
1407                 for (i = 0; rule; i++) {
1408                         snmp_rules[i].len = strlen(rule);
1409                         if (snmp_rules[i].len > IFNAMSIZ) {
1410                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1411                                 exit(1);
1412                         }
1413                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1414                         if (!*(rule - 1)) *(rule - 1) = ',';
1415                         rule = strtok(NULL, ",");
1416                         if (!rule) {
1417                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1418                                 exit(1);
1419                         }
1420                         snmp_rules[i].base = atoi(rule);
1421                         *(rule - 1) = ':';
1422                         rule = strtok(NULL, ":");
1423                 }
1424                 nsnmp_rules = i;
1425         }
1426         if (parms[tflag].count)
1427                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1428         if (parms[aflag].count) {
1429                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1430                 bad_lhost:
1431                         fprintf(stderr, "Illegal %s\n", "source address");
1432                         exit(1);
1433                 } else {
1434                         saddr = *((struct sockaddr_in *) res->ai_addr);
1435                         freeaddrinfo(res);
1436                 }
1437         }
1438         if (parms[uflag].count) 
1439                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1440                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1441                         exit(1);
1442                 }
1443
1444
1445         /* Process collectors parameters. Brrrr... :-[ */
1446
1447         npeers = argc - optind;
1448         if (npeers > 1) {
1449                 /* Send to remote Netflow collector */
1450                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1451                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1452                         dhost = argv[i];
1453                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1454                         *dport++ = 0;
1455                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1456                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1457                                 exit(1);
1458                         }
1459                         peers[npeers].write_fd = write_fd;
1460                         peers[npeers].type = PEER_MIRROR;
1461                         peers[npeers].laddr = saddr;
1462                         peers[npeers].seq = 0;
1463                         if ((lhost = strchr(dport, '/'))) {
1464                                 *lhost++ = 0;
1465                                 if ((type = strchr(lhost, '/'))) {
1466                                         *type++ = 0;
1467                                         switch (*type) {
1468                                                 case 0:
1469                                                 case 'm':
1470                                                         break;
1471
1472                                                 case 'r':
1473                                                         peers[npeers].type = PEER_ROTATE;
1474                                                         npeers_rot++;
1475                                                         break;
1476
1477                                                 default:
1478                                                         goto bad_collector;
1479                                         }
1480                                 }
1481                                 if (*lhost) {
1482                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1483                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1484                                         freeaddrinfo(res);
1485                                 }
1486                         }
1487                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1488                                                 sizeof(struct sockaddr_in))) {
1489                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1490                                 exit(1);
1491                         }
1492                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1493 bad_collector:
1494                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1495                                 exit(1);
1496                         }
1497                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1498                         freeaddrinfo(res);
1499                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1500                                                 sizeof(struct sockaddr_in))) {
1501                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1502                                 exit(1);
1503                         }
1504
1505                         /* Restore command line */
1506                         if (type) *--type = '/';
1507                         if (lhost) *--lhost = '/';
1508                         *--dport = ':';
1509                 }
1510         }
1511         else if (parms[fflag].count) {
1512                 // log into a file
1513                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1514                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1515                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1516                 
1517                 peers[npeers].write_fd = START_VALUE;
1518                 peers[npeers].type = PEER_FILE;
1519                 peers[npeers].seq = 0;
1520
1521                 get_cur_epoch();
1522                 npeers++;
1523         }
1524         else 
1525                 usage();
1526
1527
1528         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1529         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1530         if (!ulog_handle) {
1531                 fprintf(stderr, "libipulog initialization error: %s",
1532                         ipulog_strerror(ipulog_errno));
1533                 exit(1);
1534         }
1535         if (sockbufsize)
1536                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1537                         &sockbufsize, sizeof(sockbufsize)) < 0)
1538                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1539
1540         /* Daemonize (if log destination stdout-free) */
1541
1542         my_log_open(ident, verbosity, log_dest);
1543
1544         init_daemon();
1545
1546         if (!(log_dest & 2)) {
1547                 /* Crash-proofing - Sapan*/
1548                 while (1) {
1549                         int pid=fork();
1550                         if (pid==-1) {
1551                                         fprintf(stderr, "fork(): %s", strerror(errno));
1552                                         exit(1);
1553                         }
1554                         else if (pid==0) {
1555                                         setsid();
1556                                         freopen("/dev/null", "r", stdin);
1557                                         freopen("/dev/null", "w", stdout);
1558                                         freopen("/dev/null", "w", stderr);
1559                                         break;
1560                         }
1561                         else {
1562                                 while (wait3(NULL,0,NULL) < 1);
1563                         }
1564                 }
1565         } else {
1566                 setvbuf(stdout, (char *)0, _IONBF, 0);
1567                 setvbuf(stderr, (char *)0, _IONBF, 0);
1568         }
1569
1570         pid = getpid();
1571         sprintf(errpbuf, "[%ld]", (long) pid);
1572         strcat(ident, errpbuf);
1573
1574         /* Initialization */
1575
1576         hash_init(); /* Actually for crc16 only */
1577         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1578         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1579
1580 #ifdef UPTIME_TRICK
1581         /* Hope 12 days is enough :-/ */
1582         start_time_offset = 1 << 20;
1583
1584         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1585 #endif
1586         gettime(&start_time);
1587
1588         /*
1589         Build static pending queue as circular buffer.
1590         */
1591         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1592         pending_tail = pending_head;
1593         for (i = pending_queue_length - 1; i--;) {
1594                 if (!(pending_tail->next = mem_alloc())) {
1595                 err_mem_alloc:
1596                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1597                         exit(1);
1598                 }
1599                 pending_tail = pending_tail->next;
1600         }
1601         pending_tail->next = pending_head;
1602         pending_tail = pending_head;
1603
1604         sigemptyset(&sig_mask);
1605         sigact.sa_handler = &sighandler;
1606         sigact.sa_mask = sig_mask;
1607         sigact.sa_flags = 0;
1608         sigaddset(&sig_mask, SIGTERM);
1609         sigaction(SIGTERM, &sigact, 0);
1610 #if ((DEBUG) & DEBUG_I)
1611         sigaddset(&sig_mask, SIGUSR1);
1612         sigaction(SIGUSR1, &sigact, 0);
1613 #endif
1614         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1615                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1616                 exit(1);
1617         }
1618
1619         my_log(LOG_INFO, "Starting %s...", VERSION);
1620
1621         if (parms[cflag].count) {
1622                 if (chdir(parms[cflag].arg) || chroot(".")) {
1623                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1624                         exit(1);
1625                 }
1626         }
1627
1628         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1629         pthread_attr_init(&tattr);
1630         for (i = 0; i < THREADS - 1; i++) {
1631                 if (schedp.sched_priority > 0) {
1632                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1633                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1634                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1635                                 exit(1);
1636                         }
1637                 }
1638                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1639                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1640                         exit(1);
1641                 }
1642                 pthread_detach(thid);
1643                 schedp.sched_priority++;
1644         }
1645
1646         if (pw) {
1647                 if (setgroups(0, NULL)) {
1648                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1649                         exit(1);
1650                 }
1651                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1652                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1653                         exit(1);
1654                 }
1655                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1656                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1657                         exit(1);
1658                 }
1659         }
1660
1661         if (!(pidfile = fopen(pidfilepath, "w")))
1662                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1663         else {
1664                 fprintf(pidfile, "%ld\n", (long) pid);
1665                 fclose(pidfile);
1666         }
1667
1668         my_log(LOG_INFO, "pid: %d", pid);
1669         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1670                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1671                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1672                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1673                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1674                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1675                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1676         for (i = 0; i < nsnmp_rules; i++) {
1677                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1678                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1679         }
1680         for (i = 0; i < npeers; i++) {
1681                 switch (peers[i].type) {
1682                         case PEER_MIRROR:
1683                                 c = 'm';
1684                                 break;
1685                         case PEER_ROTATE:
1686                                 c = 'r';
1687                                 break;
1688                 }
1689                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1690                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1691                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1692         }
1693
1694         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1695
1696         timeout.tv_usec = 0;
1697         while (!killed
1698                 || (total_elements - free_elements - pending_queue_length)
1699                 || emit_count
1700                 || pending_tail->flags) {
1701
1702                 if (!sigs) {
1703                         timeout.tv_sec = scan_interval;
1704                         select(0, 0, 0, 0, &timeout);
1705                 }
1706
1707                 if (sigs & SIGTERM_MASK && !killed) {
1708                         sigs &= ~SIGTERM_MASK;
1709                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1710                         scan_interval = 1;
1711                         frag_lifetime = -1;
1712                         active_lifetime = -1;
1713                         inactive_lifetime = -1;
1714                         emit_timeout = 1;
1715                         unpending_timeout = 1;
1716                         killed = 1;
1717                         pthread_cond_signal(&scan_cond);
1718                         pthread_cond_signal(&unpending_cond);
1719                 }
1720
1721 #if ((DEBUG) & DEBUG_I)
1722                 if (sigs & SIGUSR1_MASK) {
1723                         sigs &= ~SIGUSR1_MASK;
1724                         info_debug();
1725                 }
1726 #endif
1727         }
1728         remove(pidfilepath);
1729 #if ((DEBUG) & DEBUG_I)
1730         info_debug();
1731 #endif
1732         my_log(LOG_INFO, "Done.");
1733 #ifdef WALL
1734         return 0;
1735 #endif
1736 }