Cleanup
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
41         int fd;
42         u_int8_t blocking;
43         struct sockaddr_nl local;
44         struct sockaddr_nl peer;
45         struct nlmsghdr* last_nlhdr;
46 };
47
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
58 #include <net/if.h>
59
60 #include <sys/param.h>
61 #include <pwd.h>
62 #ifdef OS_LINUX
63 #include <grp.h>
64 #endif
65
66 /* pthread_*() */
67 #include <pthread.h>
68
69 /* errno */
70 #include <errno.h>
71
72 /* getaddrinfo() */
73 #include <netdb.h>
74
75 /* nanosleep() */
76 #include <time.h>
77
78 /* gettimeofday() */
79 #include <sys/time.h>
80
81 /* scheduling */
82 #include <sched.h>
83
84 /* select() (POSIX)*/
85 #include <sys/select.h>
86
87 /* open() */
88 #include <sys/stat.h>
89 #include <fcntl.h>
90
91 #include <fprobe-ulog.h>
92 #include <my_log.h>
93 #include <my_getopt.h>
94 #include <netflow.h>
95 #include <hash.h>
96 #include <mem.h>
97
98 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
99 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
100 #define MAX_EPOCH_SIZE          sizeof("32767")
101 #define STD_NETFLOW_PDU
102
103 enum {
104         aflag,
105         Bflag,
106         bflag,
107         cflag,
108         dflag,
109         Dflag,
110         eflag,
111         Eflag,
112         fflag,
113         gflag,
114         hflag,
115         lflag,
116         mflag,
117         Mflag,
118         nflag,
119         qflag,
120         rflag,
121         sflag,
122         tflag,
123         Tflag,
124         Uflag,
125         uflag,
126         vflag,
127         Wflag,
128         Xflag,
129 };
130
131 static struct getopt_parms parms[] = {
132         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'h', 0, 0, 0},
143         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'M', 0, 0, 0},
146         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
148         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {0, 0, 0, 0}
157 };
158
159 extern char *optarg;
160 extern int optind, opterr, optopt;
161 extern int errno;
162
163 extern struct NetFlow NetFlow1;
164 extern struct NetFlow NetFlow5;
165 extern struct NetFlow NetFlow7;
166
167 #define START_DATA_FD -5
168 #define mark_is_tos parms[Mflag].count
169 static unsigned scan_interval = 5;
170 static unsigned int min_free = 0;
171 static int frag_lifetime = 30;
172 static int inactive_lifetime = 60;
173 static int active_lifetime = 300;
174 static int sockbufsize;
175 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
176 #if (MEM_BITS == 0) || (MEM_BITS == 16)
177 #define BULK_QUANTITY 10000
178 #else
179 #define BULK_QUANTITY 200
180 #endif
181
182 static unsigned epoch_length=60, log_epochs=1; 
183 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
184
185 static unsigned bulk_quantity = BULK_QUANTITY;
186 static unsigned pending_queue_length = 100;
187 static struct NetFlow *netflow = &NetFlow5;
188 static unsigned verbosity = 6;
189 static unsigned log_dest = MY_LOG_SYSLOG;
190 static struct Time start_time;
191 static long start_time_offset;
192 static int off_tl;
193 /* From mem.c */
194 extern unsigned total_elements;
195 extern unsigned free_elements;
196 extern unsigned total_memory;
197 #if ((DEBUG) & DEBUG_I)
198 static unsigned emit_pkts, emit_queue;
199 static uint64_t size_total;
200 static unsigned pkts_total, pkts_total_fragmented;
201 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
202 static unsigned pkts_pending, pkts_pending_done;
203 static unsigned pending_queue_trace, pending_queue_trace_candidate;
204 static unsigned flows_total, flows_fragmented;
205 #endif
206 static unsigned emit_count;
207 static uint32_t emit_sequence;
208 static unsigned emit_rate_bytes, emit_rate_delay;
209 static struct Time emit_time;
210 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
211 static pthread_t thid;
212 static sigset_t sig_mask;
213 static struct sched_param schedp;
214 static int sched_min, sched_max;
215 static int npeers, npeers_rot;
216 static struct peer *peers;
217 static int sigs;
218
219 static struct Flow *flows[1 << HASH_BITS];
220 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
221
222 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
224
225 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
227 static struct Flow *pending_head, *pending_tail;
228 static struct Flow *scan_frag_dreg;
229
230 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
231 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
232 static struct Flow *flows_emit;
233
234 static char ident[256] = "fprobe-ulog";
235 static FILE *pidfile;
236 static char *pidfilepath;
237 static pid_t pid;
238 static int killed;
239 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
240 static struct ipulog_handle *ulog_handle;
241 static uint32_t ulog_gmask = 1;
242 static char *cap_buf;
243 static int nsnmp_rules;
244 static struct snmp_rule *snmp_rules;
245 static struct passwd *pw = 0;
246
247 void usage()
248 {
249         fprintf(stdout,
250                         "fprobe-ulog: a NetFlow probe. Version %s\n"
251                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
252                         "\n"
253                         "-h\t\tDisplay this help\n"
254                         "-U <mask>\tULOG group bitwise mask [1]\n"
255                         "-s <seconds>\tHow often scan for expired flows [5]\n"
256                         "-g <seconds>\tFragmented flow lifetime [30]\n"
257                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
258                         "-f <filename>\tLog flow data in a file\n"
259                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
260                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
261                         "-a <address>\tUse <address> as source for NetFlow flow\n"
262                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
263                         "-M\t\tUse netfilter mark value as ToS flag\n"
264                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
265                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
266                         "-q <flows>\tPending queue length [100]\n"
267                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
268                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
269                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
270                         "-c <directory>\tDirectory to chroot to\n"
271                         "-u <user>\tUser to run as\n"
272                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
273                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
274                         "-y <remote:port>\tAddress of the NetFlow collector\n"
275                         "-f <writable file>\tFile to write data into\n"
276                         "-T <n>\tRotate log file every n epochs\n"
277                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
278                         "-E <[1..60]>\tSize of an epoch in minutes\n"
279                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
280                         ,
281                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
282         exit(0);
283 }
284
285 #if ((DEBUG) & DEBUG_I)
286 void info_debug()
287 {
288         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
289                         pkts_total, pkts_total_fragmented, size_total,
290                         pkts_pending - pkts_pending_done, pending_queue_trace);
291         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
292                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
293         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
294                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
295         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
296                         total_elements, free_elements, total_memory);
297 }
298 #endif
299
300 void sighandler(int sig)
301 {
302         switch (sig) {
303                 case SIGTERM:
304                         sigs |= SIGTERM_MASK;
305                         break;
306 #if ((DEBUG) & DEBUG_I)
307                 case SIGUSR1:
308                         sigs |= SIGUSR1_MASK;
309                         break;
310 #endif
311         }
312 }
313
314 void gettime(struct Time *now)
315 {
316         struct timeval t;
317
318         gettimeofday(&t, 0);
319         now->sec = t.tv_sec;
320         now->usec = t.tv_usec;
321 }
322
323
324 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
325 {
326         return (t1->sec - t2->sec)/60;
327 }
328
329 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
330 {
331         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
332 }
333
334 /* Uptime in miliseconds */
335 uint32_t getuptime(struct Time *t)
336 {
337         /* Maximum uptime is about 49/2 days */
338         return cmpmtime(t, &start_time);
339 }
340
341 /* Uptime in minutes */
342 uint32_t getuptime_minutes(struct Time *t)
343 {
344         /* Maximum uptime is about 49/2 days */
345         return cmpMtime(t, &start_time);
346 }
347
348 hash_t hash_flow(struct Flow *flow)
349 {
350         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
351         else return hash(flow, sizeof(struct Flow_TL));
352 }
353
354 uint16_t snmp_index(char *name) {
355         uint32_t i;
356
357         if (!*name) return 0;
358
359         for (i = 0; (int) i < nsnmp_rules; i++) {
360                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
361                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
362         }
363
364         if ((i = if_nametoindex(name))) return i;
365
366         return -1;
367 }
368
369 inline void copy_flow(struct Flow *src, struct Flow *dst)
370 {
371         dst->iif = src->iif;
372         dst->oif = src->oif;
373         dst->sip = src->sip;
374         dst->dip = src->dip;
375         dst->tos = src->tos;
376         dst->proto = src->proto;
377         dst->tcp_flags = src->tcp_flags;
378         dst->id = src->id;
379         dst->sp = src->sp;
380         dst->dp = src->dp;
381         dst->pkts = src->pkts;
382         dst->size = src->size;
383         dst->sizeF = src->sizeF;
384         dst->sizeP = src->sizeP;
385         dst->ctime = src->ctime;
386         dst->mtime = src->mtime;
387         dst->flags = src->flags;
388 }
389
390 void read_cur_epoch() {
391         int fd;
392         /* Reset to -1 in case the read fails */
393         cur_epoch=-1;
394         fd = open(LAST_EPOCH_FILE, O_RDONLY);
395         if (fd != -1) {
396                 char snum[MAX_EPOCH_SIZE];
397                 ssize_t len;
398                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
399                 if (len != -1) {
400                         snum[len]='\0';
401                         sscanf(snum,"%d",&cur_epoch);
402                         cur_epoch++; /* Let's not stone the last epoch */
403                         close(fd);
404                 }
405         }
406         return;
407 }
408
409
410 /* Dumps the current epoch in a file to cope with
411  * reboots and killings of fprobe */
412
413 void update_cur_epoch_file(int n) {
414         int fd, len;
415         char snum[MAX_EPOCH_SIZE];
416         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
417         fd = open(LAST_EPOCH_FILE, O_WRONLY|O_CREAT|O_TRUNC);
418         if (fd == -1) {
419                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
420                 return;
421         }
422         write(fd, snum, len);
423         close(fd);
424 }
425
426 /* Get the file descriptor corresponding to the current file.
427  * The kludgy implementation is to abstract away the 'current
428  * file descriptor', which may also be a socket. 
429  */
430
431 unsigned get_data_file_fd(char *fname, int cur_fd) {
432         struct Time now;
433         unsigned cur_uptime;
434
435         struct statfs statfs;
436         int ret_fd;
437
438         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
439          * doesn't solve the problem */
440         gettime(&now);
441         cur_uptime = getuptime_minutes(&now);
442
443         if (cur_fd != START_DATA_FD) {
444                 if (fstatfs(cur_fd, &statfs) == -1) {
445                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
446                 }
447                 else {
448                         if (min_free && (statfs.f_bavail < min_free)
449                                         && (cur_epoch==last_peak))
450                         {
451                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
452                                 cur_epoch = -1;
453                         }
454                         /*
455                            else 
456                            assume that we can reclaim space by overwriting our own files
457                            and that the difference in size will not fill the disk - sapan
458                            */
459                 }
460         }
461
462         /* If epoch length has been exceeded, 
463          * or we're starting up 
464          * or we're going back to the first epoch */
465         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
466                 char nextname[MAX_PATH_LEN];
467                 int write_fd;
468                 prev_uptime = cur_uptime;
469                 cur_epoch = (cur_epoch + 1) % log_epochs;
470                 if (cur_epoch>last_peak) last_peak = cur_epoch;
471                 if (cur_fd>0)
472                         close(cur_fd);
473                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
474                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
475                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
476                         exit(1);
477                 }
478                 update_cur_epoch_file(cur_epoch);
479                 ret_fd = write_fd;
480         }
481         else {
482                 ret_fd = cur_fd;
483         }
484         return(ret_fd);
485 }
486
487 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
488 {
489         struct Flow **flowpp;
490
491 #ifdef WALL
492         flowpp = 0;
493 #endif
494
495         if (prev) flowpp = *prev;
496
497         while (where) {
498                 if (where->sip.s_addr == what->sip.s_addr
499                                 && where->dip.s_addr == what->dip.s_addr
500                                 && where->proto == what->proto) {
501                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
502                                 case 0:
503                                         /* Both unfragmented */
504                                         if ((what->sp == where->sp)
505                                                         && (what->dp == where->dp)) goto done;
506                                         break;
507                                 case 2:
508                                         /* Both fragmented */
509                                         if (where->id == what->id) goto done;
510                                         break;
511                         }
512                 }
513                 flowpp = &where->next;
514                 where = where->next;
515         }
516 done:
517         if (prev) *prev = flowpp;
518         return where;
519 }
520
521         int put_into(struct Flow *flow, int flag
522 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
523                         , char *logbuf
524 #endif
525                     )
526 {
527         int ret = 0;
528         hash_t h;
529         struct Flow *flown, **flowpp;
530 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
531         char buf[64];
532 #endif
533
534         h = hash_flow(flow);
535 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
536         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
537         strcat(logbuf, buf);
538 #endif
539         pthread_mutex_lock(&flows_mutex[h]);
540         flowpp = &flows[h];
541         if (!(flown = find(flows[h], flow, &flowpp))) {
542                 /* No suitable flow found - add */
543                 if (flag == COPY_INTO) {
544                         if ((flown = mem_alloc())) {
545                                 copy_flow(flow, flown);
546                                 flow = flown;
547                         } else {
548 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
549                                 my_log(LOG_ERR, "%s %s. %s",
550                                                 "mem_alloc():", strerror(errno), "packet lost");
551 #endif
552                                 return -1;
553                         }
554                 }
555                 flow->next = flows[h];
556                 flows[h] = flow;
557 #if ((DEBUG) & DEBUG_I)
558                 flows_total++;
559                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
560 #endif
561 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
562                 if (flown) {
563                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
564                         strcat(logbuf, buf);
565                 }
566 #endif
567         } else {
568                 /* Found suitable flow - update */
569 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
570                 sprintf(buf, " +> %x", (unsigned) flown);
571                 strcat(logbuf, buf);
572 #endif
573                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
574                         flown->mtime = flow->mtime;
575                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
576                         flown->ctime = flow->ctime;
577                 flown->tcp_flags |= flow->tcp_flags;
578                 flown->size += flow->size;
579                 flown->pkts += flow->pkts;
580                 if (flow->flags & FLOW_FRAG) {
581                         /* Fragmented flow require some additional work */
582                         if (flow->flags & FLOW_TL) {
583                                 /*
584                                    ?FIXME?
585                                    Several packets with FLOW_TL (attack)
586                                    */
587                                 flown->sp = flow->sp;
588                                 flown->dp = flow->dp;
589                         }
590                         if (flow->flags & FLOW_LASTFRAG) {
591                                 /*
592                                    ?FIXME?
593                                    Several packets with FLOW_LASTFRAG (attack)
594                                    */
595                                 flown->sizeP = flow->sizeP;
596                         }
597                         flown->flags |= flow->flags;
598                         flown->sizeF += flow->sizeF;
599                         if ((flown->flags & FLOW_LASTFRAG)
600                                         && (flown->sizeF >= flown->sizeP)) {
601                                 /* All fragments received - flow reassembled */
602                                 *flowpp = flown->next;
603                                 pthread_mutex_unlock(&flows_mutex[h]);
604 #if ((DEBUG) & DEBUG_I)
605                                 flows_total--;
606                                 flows_fragmented--;
607 #endif
608                                 flown->id = 0;
609                                 flown->flags &= ~FLOW_FRAG;
610 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
611                                 strcat(logbuf," R");
612 #endif
613                                 ret = put_into(flown, MOVE_INTO
614 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
615                                                 , logbuf
616 #endif
617                                               );
618                         }
619                 }
620                 if (flag == MOVE_INTO) mem_free(flow);
621         }
622         pthread_mutex_unlock(&flows_mutex[h]);
623         return ret;
624 }
625
626 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
627 {
628         int i;
629
630         for (i = 0; i < fields; i++) {
631 #if ((DEBUG) & DEBUG_F)
632                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
633 #endif
634                 switch (format[i]) {
635                         case NETFLOW_IPV4_SRC_ADDR:
636                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
637                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
638                                 break;
639
640                         case NETFLOW_IPV4_DST_ADDR:
641                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
642                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
643                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
644                                 }
645                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
646                                 break;
647
648                         case NETFLOW_INPUT_SNMP:
649                                 *((uint16_t *) p) = htons(flow->iif);
650                                 p += NETFLOW_INPUT_SNMP_SIZE;
651                                 break;
652
653                         case NETFLOW_OUTPUT_SNMP:
654                                 *((uint16_t *) p) = htons(flow->oif);
655                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
656                                 break;
657
658                         case NETFLOW_PKTS_32:
659                                 *((uint32_t *) p) = htonl(flow->pkts);
660                                 p += NETFLOW_PKTS_32_SIZE;
661                                 break;
662
663                         case NETFLOW_BYTES_32:
664                                 *((uint32_t *) p) = htonl(flow->size);
665                                 p += NETFLOW_BYTES_32_SIZE;
666                                 break;
667
668                         case NETFLOW_FIRST_SWITCHED:
669                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
670                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
671                                 break;
672
673                         case NETFLOW_LAST_SWITCHED:
674                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
675                                 p += NETFLOW_LAST_SWITCHED_SIZE;
676                                 break;
677
678                         case NETFLOW_L4_SRC_PORT:
679                                 *((uint16_t *) p) = flow->sp;
680                                 p += NETFLOW_L4_SRC_PORT_SIZE;
681                                 break;
682
683                         case NETFLOW_L4_DST_PORT:
684                                 *((uint16_t *) p) = flow->dp;
685                                 p += NETFLOW_L4_DST_PORT_SIZE;
686                                 break;
687
688                         case NETFLOW_PROT:
689                                 *((uint8_t *) p) = flow->proto;
690                                 p += NETFLOW_PROT_SIZE;
691                                 break;
692
693                         case NETFLOW_SRC_TOS:
694                                 *((uint8_t *) p) = flow->tos;
695                                 p += NETFLOW_SRC_TOS_SIZE;
696                                 break;
697
698                         case NETFLOW_TCP_FLAGS:
699                                 *((uint8_t *) p) = flow->tcp_flags;
700                                 p += NETFLOW_TCP_FLAGS_SIZE;
701                                 break;
702
703                         case NETFLOW_VERSION:
704                                 *((uint16_t *) p) = htons(netflow->Version);
705                                 p += NETFLOW_VERSION_SIZE;
706                                 break;
707
708                         case NETFLOW_COUNT:
709                                 *((uint16_t *) p) = htons(emit_count);
710                                 p += NETFLOW_COUNT_SIZE;
711                                 break;
712
713                         case NETFLOW_UPTIME:
714                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
715                                 p += NETFLOW_UPTIME_SIZE;
716                                 break;
717
718                         case NETFLOW_UNIX_SECS:
719                                 *((uint32_t *) p) = htonl(emit_time.sec);
720                                 p += NETFLOW_UNIX_SECS_SIZE;
721                                 break;
722
723                         case NETFLOW_UNIX_NSECS:
724                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
725                                 p += NETFLOW_UNIX_NSECS_SIZE;
726                                 break;
727
728                         case NETFLOW_FLOW_SEQUENCE:
729                                 //*((uint32_t *) p) = htonl(emit_sequence);
730                                 *((uint32_t *) p) = 0;
731                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
732                                 break;
733
734                         case NETFLOW_PAD8:
735                                 /* Unsupported (uint8_t) */
736                         case NETFLOW_ENGINE_TYPE:
737                         case NETFLOW_ENGINE_ID:
738                         case NETFLOW_FLAGS7_1:
739                         case NETFLOW_SRC_MASK:
740                         case NETFLOW_DST_MASK:
741                                 *((uint8_t *) p) = 0;
742                                 p += NETFLOW_PAD8_SIZE;
743                                 break;
744                         case NETFLOW_XID:
745                                 *((uint16_t *) p) = flow->tos;
746                                 p += NETFLOW_XID_SIZE;
747                                 break;
748                         case NETFLOW_PAD16:
749                                 /* Unsupported (uint16_t) */
750                         case NETFLOW_SRC_AS:
751                         case NETFLOW_DST_AS:
752                         case NETFLOW_FLAGS7_2:
753                                 *((uint16_t *) p) = 0;
754                                 p += NETFLOW_PAD16_SIZE;
755                                 break;
756
757                         case NETFLOW_PAD32:
758                                 /* Unsupported (uint32_t) */
759                         case NETFLOW_IPV4_NEXT_HOP:
760                         case NETFLOW_ROUTER_SC:
761                                 *((uint32_t *) p) = 0;
762                                 p += NETFLOW_PAD32_SIZE;
763                                 break;
764
765                         default:
766                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
767                                                 format, i, format[i]);
768                                 exit(1);
769                 }
770         }
771 #if ((DEBUG) & DEBUG_F)
772         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
773 #endif
774         return p;
775 }
776
777 void setuser() {
778         /*
779            Workaround for clone()-based threads
780            Try to change EUID independently of main thread
781            */
782         if (pw) {
783                 setgroups(0, NULL);
784                 setregid(pw->pw_gid, pw->pw_gid);
785                 setreuid(pw->pw_uid, pw->pw_uid);
786         }
787 }
788
789 void *emit_thread()
790 {
791         struct Flow *flow;
792         void *p;
793         struct timeval now;
794         struct timespec timeout;
795         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
796
797         p = (void *) &emit_packet + netflow->HeaderSize;
798         timeout.tv_nsec = 0;
799
800         setuser();
801
802         for (;;) {
803                 pthread_mutex_lock(&emit_mutex);
804                 while (!flows_emit) {
805                         gettimeofday(&now, 0);
806                         timeout.tv_sec = now.tv_sec + emit_timeout;
807                         /* Do not wait until emit_packet will filled - it may be too long */
808                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
809                                 pthread_mutex_unlock(&emit_mutex);
810                                 goto sendit;
811                         }
812                 }
813                 flow = flows_emit;
814                 flows_emit = flows_emit->next;
815 #if ((DEBUG) & DEBUG_I)
816                 emit_queue--;
817 #endif          
818                 pthread_mutex_unlock(&emit_mutex);
819
820 #ifdef UPTIME_TRICK
821                 if (!emit_count) {
822                         gettime(&start_time);
823                         start_time.sec -= start_time_offset;
824                 }
825 #endif
826                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
827                 mem_free(flow);
828                 emit_count++;
829 #ifdef PF2_DEBUG
830                 printf("Emit count = %d\n", emit_count);
831                 fflush(stdout);
832 #endif
833                 if (emit_count == netflow->MaxFlows) {
834 sendit:
835                         gettime(&emit_time);
836                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
837                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
838                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
839 #ifdef STD_NETFLOW_PDU
840                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
841 #endif
842                         peer_rot_cur = 0;
843                         for (i = 0; i < npeers; i++) {
844                                 if (peers[i].type == PEER_FILE) {
845                                         if (netflow->SeqOffset)
846                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
847                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
848                                         ret = write(peers[i].write_fd, emit_packet, size);
849                                         if (ret < size) {
850
851 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
852                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
853                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
854 #endif
855 #undef MESSAGES
856                                         }
857 #if ((DEBUG) & DEBUG_E)
858                                         commaneelse {
859                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
860                                                                 emit_count, i + 1, peers[i].seq);
861                                         }
862 #endif
863                                         peers[i].seq += emit_count;
864
865                                         /* Rate limit */
866                                         if (emit_rate_bytes) {
867                                                 sent += size;
868                                                 delay = sent / emit_rate_bytes;
869                                                 if (delay) {
870                                                         sent %= emit_rate_bytes;
871                                                         timeout.tv_sec = 0;
872                                                         timeout.tv_nsec = emit_rate_delay * delay;
873                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
874                                                 }
875                                         }
876                                 }
877                                 else
878                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
879                                         else
880                                                 if (peers[i].type == PEER_ROTATE) 
881                                                         if (peer_rot_cur++ == peer_rot_work) {
882 sendreal:
883                                                                 if (netflow->SeqOffset)
884                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
885                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
886                                                                 if (ret < size) {
887 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
888                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
889                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
890 #endif
891                                                                 }
892 #if ((DEBUG) & DEBUG_E)
893                                                                 commaneelse {
894                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
895                                                                                         emit_count, i + 1, peers[i].seq);
896                                                                 }
897 #endif
898                                                                 peers[i].seq += emit_count;
899
900                                                                 /* Rate limit */
901                                                                 if (emit_rate_bytes) {
902                                                                         sent += size;
903                                                                         delay = sent / emit_rate_bytes;
904                                                                         if (delay) {
905                                                                                 sent %= emit_rate_bytes;
906                                                                                 timeout.tv_sec = 0;
907                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
908                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
909                                                                         }
910                                                                 }
911                                                         }
912                         }
913                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
914                         emit_sequence += emit_count;
915                         emit_count = 0;
916 #if ((DEBUG) & DEBUG_I)
917                         emit_pkts++;
918 #endif
919                 }
920         }
921 }       
922
923 void *unpending_thread()
924 {
925         struct timeval now;
926         struct timespec timeout;
927 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
928         char logbuf[256];
929 #endif
930
931         setuser();
932
933         timeout.tv_nsec = 0;
934         pthread_mutex_lock(&unpending_mutex);
935
936         for (;;) {
937                 while (!(pending_tail->flags & FLOW_PENDING)) {
938                         gettimeofday(&now, 0);
939                         timeout.tv_sec = now.tv_sec + unpending_timeout;
940                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
941                 }
942
943 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
944                 *logbuf = 0;
945 #endif
946                 if (put_into(pending_tail, COPY_INTO
947 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
948                                         , logbuf
949 #endif
950                             ) < 0) {
951 #if ((DEBUG) & DEBUG_I)
952                         pkts_lost_unpending++;
953 #endif                          
954                 }
955
956 #if ((DEBUG) & DEBUG_U)
957                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
958 #endif
959
960                 pending_tail->flags = 0;
961                 pending_tail = pending_tail->next;
962 #if ((DEBUG) & DEBUG_I)
963                 pkts_pending_done++;
964 #endif
965         }
966 }
967
968 void *scan_thread()
969 {
970 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
971         char logbuf[256];
972 #endif
973         int i;
974         struct Flow *flow, **flowpp;
975         struct Time now;
976         struct timespec timeout;
977
978         setuser();
979
980         timeout.tv_nsec = 0;
981         pthread_mutex_lock(&scan_mutex);
982
983         for (;;) {
984                 gettime(&now);
985                 timeout.tv_sec = now.sec + scan_interval;
986                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
987
988                 gettime(&now);
989 #if ((DEBUG) & DEBUG_S)
990                 my_log(LOG_DEBUG, "S: %d", now.sec);
991 #endif
992                 for (i = 0; i < 1 << HASH_BITS ; i++) {
993                         pthread_mutex_lock(&flows_mutex[i]);
994                         flow = flows[i];
995                         flowpp = &flows[i];
996                         while (flow) {
997                                 if (flow->flags & FLOW_FRAG) {
998                                         /* Process fragmented flow */
999                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1000                                                 /* Fragmented flow expired - put it into special chain */
1001 #if ((DEBUG) & DEBUG_I)
1002                                                 flows_fragmented--;
1003                                                 flows_total--;
1004 #endif
1005                                                 *flowpp = flow->next;
1006                                                 flow->id = 0;
1007                                                 flow->flags &= ~FLOW_FRAG;
1008                                                 flow->next = scan_frag_dreg;
1009                                                 scan_frag_dreg = flow;
1010                                                 flow = *flowpp;
1011                                                 continue;
1012                                         }
1013                                 } else {
1014                                         /* Flow is not frgamented */
1015                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1016                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1017                                                 /* Flow expired */
1018 #if ((DEBUG) & DEBUG_S)
1019                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1020 #endif
1021 #if ((DEBUG) & DEBUG_I)
1022                                                 flows_total--;
1023 #endif
1024                                                 *flowpp = flow->next;
1025                                                 pthread_mutex_lock(&emit_mutex);
1026                                                 flow->next = flows_emit;
1027                                                 flows_emit = flow;
1028 #if ((DEBUG) & DEBUG_I)
1029                                                 emit_queue++;
1030 #endif                          
1031                                                 pthread_mutex_unlock(&emit_mutex);
1032                                                 flow = *flowpp;
1033                                                 continue;
1034                                         }
1035                                 }
1036                                 flowpp = &flow->next;
1037                                 flow = flow->next;
1038                         } /* chain loop */
1039                         pthread_mutex_unlock(&flows_mutex[i]);
1040                 } /* hash loop */
1041                 if (flows_emit) pthread_cond_signal(&emit_cond);
1042
1043                 while (scan_frag_dreg) {
1044                         flow = scan_frag_dreg;
1045                         scan_frag_dreg = flow->next;
1046 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1047                         *logbuf = 0;
1048 #endif
1049                         put_into(flow, MOVE_INTO
1050 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1051                                         , logbuf
1052 #endif
1053                                 );
1054 #if ((DEBUG) & DEBUG_S)
1055                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1056 #endif
1057                 }
1058         }
1059 }
1060
1061 void *cap_thread()
1062 {
1063         struct ulog_packet_msg *ulog_msg;
1064         struct ip *nl;
1065         void *tl;
1066         struct Flow *flow;
1067         int len, off_frag, psize;
1068 #if ((DEBUG) & DEBUG_C)
1069         char buf[64];
1070         char logbuf[256];
1071 #endif
1072
1073         setuser();
1074
1075         while (!killed) {
1076                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1077                 if (len <= 0) {
1078                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1079                         continue;
1080                 }
1081                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1082
1083 #if ((DEBUG) & DEBUG_C)
1084                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1085 #endif
1086
1087                         nl = (void *) &ulog_msg->payload;
1088                         psize = ulog_msg->data_len;
1089
1090                         /* Sanity check */
1091                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1092 #if ((DEBUG) & DEBUG_C)
1093                                 strcat(logbuf, " U");
1094                                 my_log(LOG_DEBUG, "%s", logbuf);
1095 #endif
1096 #if ((DEBUG) & DEBUG_I)
1097                                 pkts_ignored++;
1098 #endif
1099                                 continue;
1100                         }
1101
1102                         if (pending_head->flags) {
1103 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1104                                 my_log(LOG_ERR,
1105 # if ((DEBUG) & DEBUG_C)
1106                                                 "%s %s %s", logbuf,
1107 # else
1108                                                 "%s %s",
1109 # endif
1110                                                 "pending queue full:", "packet lost");
1111 #endif
1112 #if ((DEBUG) & DEBUG_I)
1113                                 pkts_lost_capture++;
1114 #endif
1115                                 goto done;
1116                         }
1117
1118 #if ((DEBUG) & DEBUG_I)
1119                         pkts_total++;
1120 #endif
1121
1122                         flow = pending_head;
1123
1124                         /* ?FIXME? Add sanity check for ip_len? */
1125                         flow->size = ntohs(nl->ip_len);
1126 #if ((DEBUG) & DEBUG_I)
1127                         size_total += flow->size;
1128 #endif
1129
1130                         flow->sip = nl->ip_src;
1131                         flow->dip = nl->ip_dst;
1132                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1133                         if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1134                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
1135                         }
1136                         flow->iif = snmp_index(ulog_msg->indev_name);
1137                         flow->oif = snmp_index(ulog_msg->outdev_name);
1138                         flow->proto = nl->ip_p;
1139                         flow->id = 0;
1140                         flow->tcp_flags = 0;
1141                         flow->pkts = 1;
1142                         flow->sizeF = 0;
1143                         flow->sizeP = 0;
1144                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1145                         if (ulog_msg->timestamp_sec) {
1146                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1147                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1148                         } else gettime(&flow->ctime);
1149                         flow->mtime = flow->ctime;
1150
1151                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1152
1153                         /*
1154                            Offset (from network layer) to transport layer header/IP data
1155                            IOW IP header size ;-)
1156
1157                            ?FIXME?
1158                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1159                            */
1160                         off_tl = nl->ip_hl << 2;
1161                         tl = (void *) nl + off_tl;
1162
1163                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1164                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1165                         psize -= off_tl;
1166                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1167                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1168
1169                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1170                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1171 #if ((DEBUG) & DEBUG_C)
1172                                 strcat(logbuf, " F");
1173 #endif
1174 #if ((DEBUG) & DEBUG_I)
1175                                 pkts_total_fragmented++;
1176 #endif
1177                                 flow->flags |= FLOW_FRAG;
1178                                 flow->id = nl->ip_id;
1179
1180                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1181                                         /* Packet whith IP_MF contains information about whole datagram size */
1182                                         flow->flags |= FLOW_LASTFRAG;
1183                                         /* size = frag_offset*8 + data_size */
1184                                         flow->sizeP = off_frag + flow->sizeF;
1185                                 }
1186                         }
1187
1188 #if ((DEBUG) & DEBUG_C)
1189                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1190                         strcat(logbuf, buf);
1191                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1192                         strcat(logbuf, buf);
1193 #endif
1194
1195                         /*
1196                            Fortunately most interesting transport layer information fit
1197                            into first 8 bytes of IP data field (minimal nonzero size).
1198                            Thus we don't need actual packet reassembling to build whole
1199                            transport layer data. We only check the fragment offset for
1200                            zero value to find packet with this information.
1201                            */
1202                         if (!off_frag && psize >= 8) {
1203                                 switch (flow->proto) {
1204                                         case IPPROTO_TCP:
1205                                         case IPPROTO_UDP:
1206                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1207                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1208                                                 goto tl_known;
1209
1210 #ifdef ICMP_TRICK
1211                                         case IPPROTO_ICMP:
1212                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1213                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1214                                                 goto tl_known;
1215 #endif
1216 #ifdef ICMP_TRICK_CISCO
1217                                         case IPPROTO_ICMP:
1218                                                 flow->dp = *((int32_t *) tl);
1219                                                 goto tl_known;
1220 #endif
1221
1222                                         default:
1223                                                 /* Unknown transport layer */
1224 #if ((DEBUG) & DEBUG_C)
1225                                                 strcat(logbuf, " U");
1226 #endif
1227                                                 flow->sp = 0;
1228                                                 flow->dp = 0;
1229                                                 break;
1230
1231 tl_known:
1232 #if ((DEBUG) & DEBUG_C)
1233                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1234                                                 strcat(logbuf, buf);
1235 #endif
1236                                                 flow->flags |= FLOW_TL;
1237                                 }
1238                         }
1239
1240                         /* Check for tcp flags presence (including CWR and ECE). */
1241                         if (flow->proto == IPPROTO_TCP
1242                                         && off_frag < 16
1243                                         && psize >= 16 - off_frag) {
1244                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1245 #if ((DEBUG) & DEBUG_C)
1246                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1247                                 strcat(logbuf, buf);
1248 #endif
1249                         }
1250
1251 #if ((DEBUG) & DEBUG_C)
1252                         sprintf(buf, " => %x", (unsigned) flow);
1253                         strcat(logbuf, buf);
1254                         my_log(LOG_DEBUG, "%s", logbuf);
1255 #endif
1256
1257 #if ((DEBUG) & DEBUG_I)
1258                         pkts_pending++;
1259                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1260                         if (pending_queue_trace < pending_queue_trace_candidate)
1261                                 pending_queue_trace = pending_queue_trace_candidate;
1262 #endif
1263
1264                         /* Flow complete - inform unpending_thread() about it */
1265                         pending_head->flags |= FLOW_PENDING;
1266                         pending_head = pending_head->next;
1267 done:
1268                         pthread_cond_signal(&unpending_cond);
1269                 }
1270         }
1271         return 0;
1272 }
1273
1274 /* Copied out of CoDemux */
1275
1276 static int init_daemon() {
1277         pid_t pid;
1278         FILE *pidfile;
1279
1280         pidfile = fopen(PIDFILE, "w");
1281         if (pidfile == NULL) {
1282                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1283         }
1284
1285         if ((pid = fork()) < 0) {
1286                 fclose(pidfile);
1287                 my_log(LOG_ERR, "Could not fork!\n");
1288                 return(-1);
1289         }
1290         else if (pid != 0) {
1291                 /* i'm the parent, writing down the child pid  */
1292                 fprintf(pidfile, "%u\n", pid);
1293                 fclose(pidfile);
1294                 exit(0);
1295         }
1296
1297         /* close the pid file */
1298         fclose(pidfile);
1299
1300         /* routines for any daemon process
1301            1. create a new session 
1302            2. change directory to the root
1303            3. change the file creation permission 
1304            */
1305         setsid();
1306         chdir("/var/local/fprobe");
1307         umask(0);
1308
1309         return(0);
1310 }
1311
1312 int main(int argc, char **argv)
1313 {
1314         char errpbuf[512];
1315         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1316         int c, i, write_fd, memory_limit = 0;
1317         struct addrinfo hints, *res;
1318         struct sockaddr_in saddr;
1319         pthread_attr_t tattr;
1320         struct sigaction sigact;
1321         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1322         struct timeval timeout;
1323
1324         sched_min = sched_get_priority_min(SCHED);
1325         sched_max = sched_get_priority_max(SCHED);
1326
1327         memset(&saddr, 0 , sizeof(saddr));
1328         memset(&hints, 0 , sizeof(hints));
1329         hints.ai_flags = AI_PASSIVE;
1330         hints.ai_family = AF_INET;
1331         hints.ai_socktype = SOCK_DGRAM;
1332
1333         /* Process command line options */
1334
1335         opterr = 0;
1336         while ((c = my_getopt(argc, argv, parms)) != -1) {
1337                 switch (c) {
1338                         case '?':
1339                                 usage();
1340
1341                         case 'h':
1342                                 usage();
1343                 }
1344         }
1345
1346         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1347         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1348         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1349         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1350         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1351         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1352         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1353         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1354         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1355         if (parms[nflag].count) {
1356                 switch (atoi(parms[nflag].arg)) {
1357                         case 1:
1358                                 netflow = &NetFlow1;
1359                                 break;
1360
1361                         case 5:
1362                                 break;
1363
1364                         case 7:
1365                                 netflow = &NetFlow7;
1366                                 break;
1367
1368                         default:
1369                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1370                                 exit(1);
1371                 }
1372         }
1373         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1374         if (parms[lflag].count) {
1375                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1376                         *log_suffix++ = 0;
1377                         if (*log_suffix) {
1378                                 sprintf(errpbuf, "[%s]", log_suffix);
1379                                 strcat(ident, errpbuf);
1380                         }
1381                 }
1382                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1383                 if (log_suffix) *--log_suffix = ':';
1384         }
1385         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1386 err_malloc:
1387                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1388                 exit(1);
1389         }
1390         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1391         if (parms[qflag].count) {
1392                 pending_queue_length = atoi(parms[qflag].arg);
1393                 if (pending_queue_length < 1) {
1394                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1395                         exit(1);
1396                 }
1397         }
1398         if (parms[rflag].count) {
1399                 schedp.sched_priority = atoi(parms[rflag].arg);
1400                 if (schedp.sched_priority
1401                                 && (schedp.sched_priority < sched_min
1402                                         || schedp.sched_priority > sched_max)) {
1403                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1404                         exit(1);
1405                 }
1406         }
1407         if (parms[Bflag].count) {
1408                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1409         }
1410         if (parms[bflag].count) {
1411                 bulk_quantity = atoi(parms[bflag].arg);
1412                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1413                         fprintf(stderr, "Illegal %s\n", "bulk size");
1414                         exit(1);
1415                 }
1416         }
1417         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1418         if (parms[Xflag].count) {
1419                 for(i = 0; parms[Xflag].arg[i]; i++)
1420                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1421                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1422                         goto err_malloc;
1423                 rule = strtok(parms[Xflag].arg, ":");
1424                 for (i = 0; rule; i++) {
1425                         snmp_rules[i].len = strlen(rule);
1426                         if (snmp_rules[i].len > IFNAMSIZ) {
1427                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1428                                 exit(1);
1429                         }
1430                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1431                         if (!*(rule - 1)) *(rule - 1) = ',';
1432                         rule = strtok(NULL, ",");
1433                         if (!rule) {
1434                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1435                                 exit(1);
1436                         }
1437                         snmp_rules[i].base = atoi(rule);
1438                         *(rule - 1) = ':';
1439                         rule = strtok(NULL, ":");
1440                 }
1441                 nsnmp_rules = i;
1442         }
1443         if (parms[tflag].count)
1444                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1445         if (parms[aflag].count) {
1446                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1447 bad_lhost:
1448                         fprintf(stderr, "Illegal %s\n", "source address");
1449                         exit(1);
1450                 } else {
1451                         saddr = *((struct sockaddr_in *) res->ai_addr);
1452                         freeaddrinfo(res);
1453                 }
1454         }
1455         if (parms[uflag].count) 
1456                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1457                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1458                         exit(1);
1459                 }
1460
1461
1462         /* Process collectors parameters. Brrrr... :-[ */
1463
1464         npeers = argc - optind;
1465         if (npeers > 1) {
1466                 /* Send to remote Netflow collector */
1467                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1468                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1469                         dhost = argv[i];
1470                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1471                         *dport++ = 0;
1472                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1473                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1474                                 exit(1);
1475                         }
1476                         peers[npeers].write_fd = write_fd;
1477                         peers[npeers].type = PEER_MIRROR;
1478                         peers[npeers].laddr = saddr;
1479                         peers[npeers].seq = 0;
1480                         if ((lhost = strchr(dport, '/'))) {
1481                                 *lhost++ = 0;
1482                                 if ((type = strchr(lhost, '/'))) {
1483                                         *type++ = 0;
1484                                         switch (*type) {
1485                                                 case 0:
1486                                                 case 'm':
1487                                                         break;
1488
1489                                                 case 'r':
1490                                                         peers[npeers].type = PEER_ROTATE;
1491                                                         npeers_rot++;
1492                                                         break;
1493
1494                                                 default:
1495                                                         goto bad_collector;
1496                                         }
1497                                 }
1498                                 if (*lhost) {
1499                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1500                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1501                                         freeaddrinfo(res);
1502                                 }
1503                         }
1504                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1505                                                 sizeof(struct sockaddr_in))) {
1506                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1507                                 exit(1);
1508                         }
1509                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1510 bad_collector:
1511                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1512                                 exit(1);
1513                         }
1514                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1515                         freeaddrinfo(res);
1516                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1517                                                 sizeof(struct sockaddr_in))) {
1518                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1519                                 exit(1);
1520                         }
1521
1522                         /* Restore command line */
1523                         if (type) *--type = '/';
1524                         if (lhost) *--lhost = '/';
1525                         *--dport = ':';
1526                 }
1527         }
1528         else if (parms[fflag].count) {
1529                 // log into a file
1530                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1531                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1532                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1533
1534                 peers[npeers].write_fd = START_DATA_FD;
1535                 peers[npeers].type = PEER_FILE;
1536                 peers[npeers].seq = 0;
1537
1538                 read_cur_epoch();
1539                 npeers++;
1540         }
1541         else 
1542                 usage();
1543
1544
1545         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1546         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1547         if (!ulog_handle) {
1548                 fprintf(stderr, "libipulog initialization error: %s",
1549                                 ipulog_strerror(ipulog_errno));
1550                 exit(1);
1551         }
1552         if (sockbufsize)
1553                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1554                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1555                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1556
1557         /* Daemonize (if log destination stdout-free) */
1558
1559         my_log_open(ident, verbosity, log_dest);
1560
1561         init_daemon();
1562
1563         if (!(log_dest & 2)) {
1564                 /* Crash-proofing - Sapan*/
1565                 while (1) {
1566                         int pid=fork();
1567                         if (pid==-1) {
1568                                 fprintf(stderr, "fork(): %s", strerror(errno));
1569                                 exit(1);
1570                         }
1571                         else if (pid==0) {
1572                                 setsid();
1573                                 freopen("/dev/null", "r", stdin);
1574                                 freopen("/dev/null", "w", stdout);
1575                                 freopen("/dev/null", "w", stderr);
1576                                 break;
1577                         }
1578                         else {
1579                                 while (wait3(NULL,0,NULL) < 1);
1580                         }
1581                 }
1582         } else {
1583                 setvbuf(stdout, (char *)0, _IONBF, 0);
1584                 setvbuf(stderr, (char *)0, _IONBF, 0);
1585         }
1586
1587         pid = getpid();
1588         sprintf(errpbuf, "[%ld]", (long) pid);
1589         strcat(ident, errpbuf);
1590
1591         /* Initialization */
1592
1593         hash_init(); /* Actually for crc16 only */
1594         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1595         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1596
1597 #ifdef UPTIME_TRICK
1598         /* Hope 12 days is enough :-/ */
1599         start_time_offset = 1 << 20;
1600
1601         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1602 #endif
1603         gettime(&start_time);
1604
1605         /*
1606            Build static pending queue as circular buffer.
1607            */
1608         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1609         pending_tail = pending_head;
1610         for (i = pending_queue_length - 1; i--;) {
1611                 if (!(pending_tail->next = mem_alloc())) {
1612 err_mem_alloc:
1613                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1614                         exit(1);
1615                 }
1616                 pending_tail = pending_tail->next;
1617         }
1618         pending_tail->next = pending_head;
1619         pending_tail = pending_head;
1620
1621         sigemptyset(&sig_mask);
1622         sigact.sa_handler = &sighandler;
1623         sigact.sa_mask = sig_mask;
1624         sigact.sa_flags = 0;
1625         sigaddset(&sig_mask, SIGTERM);
1626         sigaction(SIGTERM, &sigact, 0);
1627 #if ((DEBUG) & DEBUG_I)
1628         sigaddset(&sig_mask, SIGUSR1);
1629         sigaction(SIGUSR1, &sigact, 0);
1630 #endif
1631         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1632                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1633                 exit(1);
1634         }
1635
1636         my_log(LOG_INFO, "Starting %s...", VERSION);
1637
1638         if (parms[cflag].count) {
1639                 if (chdir(parms[cflag].arg) || chroot(".")) {
1640                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1641                         exit(1);
1642                 }
1643         }
1644
1645         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1646         pthread_attr_init(&tattr);
1647         for (i = 0; i < THREADS - 1; i++) {
1648                 if (schedp.sched_priority > 0) {
1649                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1650                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1651                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1652                                 exit(1);
1653                         }
1654                 }
1655                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1656                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1657                         exit(1);
1658                 }
1659                 pthread_detach(thid);
1660                 schedp.sched_priority++;
1661         }
1662
1663         if (pw) {
1664                 if (setgroups(0, NULL)) {
1665                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1666                         exit(1);
1667                 }
1668                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1669                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1670                         exit(1);
1671                 }
1672                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1673                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1674                         exit(1);
1675                 }
1676         }
1677
1678         if (!(pidfile = fopen(pidfilepath, "w")))
1679                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1680         else {
1681                 fprintf(pidfile, "%ld\n", (long) pid);
1682                 fclose(pidfile);
1683         }
1684
1685         my_log(LOG_INFO, "pid: %d", pid);
1686         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1687                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1688                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1689                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1690                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1691                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1692                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1693         for (i = 0; i < nsnmp_rules; i++) {
1694                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1695                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1696         }
1697         for (i = 0; i < npeers; i++) {
1698                 switch (peers[i].type) {
1699                         case PEER_MIRROR:
1700                                 c = 'm';
1701                                 break;
1702                         case PEER_ROTATE:
1703                                 c = 'r';
1704                                 break;
1705                 }
1706                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1707                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1708                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1709         }
1710
1711         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1712
1713         timeout.tv_usec = 0;
1714         while (!killed
1715                         || (total_elements - free_elements - pending_queue_length)
1716                         || emit_count
1717                         || pending_tail->flags) {
1718
1719                 if (!sigs) {
1720                         timeout.tv_sec = scan_interval;
1721                         select(0, 0, 0, 0, &timeout);
1722                 }
1723
1724                 if (sigs & SIGTERM_MASK && !killed) {
1725                         sigs &= ~SIGTERM_MASK;
1726                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1727                         scan_interval = 1;
1728                         frag_lifetime = -1;
1729                         active_lifetime = -1;
1730                         inactive_lifetime = -1;
1731                         emit_timeout = 1;
1732                         unpending_timeout = 1;
1733                         killed = 1;
1734                         pthread_cond_signal(&scan_cond);
1735                         pthread_cond_signal(&unpending_cond);
1736                 }
1737
1738 #if ((DEBUG) & DEBUG_I)
1739                 if (sigs & SIGUSR1_MASK) {
1740                         sigs &= ~SIGUSR1_MASK;
1741                         info_debug();
1742                 }
1743 #endif
1744         }
1745         remove(pidfilepath);
1746 #if ((DEBUG) & DEBUG_I)
1747         info_debug();
1748 #endif
1749         my_log(LOG_INFO, "Done.");
1750 #ifdef WALL
1751         return 0;
1752 #endif
1753 }