*Grumble*
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 #include "vserver.h"
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
223
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
231
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
235
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
239 static pid_t pid;
240 static int killed;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
248
249 void usage()
250 {
251         fprintf(stdout,
252                         "fprobe-ulog: a NetFlow probe. Version %s\n"
253                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
254                         "\n"
255                         "-h\t\tDisplay this help\n"
256                         "-U <mask>\tULOG group bitwise mask [1]\n"
257                         "-s <seconds>\tHow often scan for expired flows [5]\n"
258                         "-g <seconds>\tFragmented flow lifetime [30]\n"
259                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260                         "-f <filename>\tLog flow data in a file\n"
261                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263                         "-a <address>\tUse <address> as source for NetFlow flow\n"
264                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265                         "-M\t\tUse netfilter mark value as ToS flag\n"
266                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268                         "-q <flows>\tPending queue length [100]\n"
269                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
270                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272                         "-c <directory>\tDirectory to chroot to\n"
273                         "-u <user>\tUser to run as\n"
274                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276                         "-y <remote:port>\tAddress of the NetFlow collector\n"
277                         "-f <writable file>\tFile to write data into\n"
278                         "-T <n>\tRotate log file every n epochs\n"
279                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280                         "-E <[1..60]>\tSize of an epoch in minutes\n"
281                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
282                         ,
283                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
284         exit(0);
285 }
286
287 #if ((DEBUG) & DEBUG_I)
288 void info_debug()
289 {
290         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291                         pkts_total, pkts_total_fragmented, size_total,
292                         pkts_pending - pkts_pending_done, pending_queue_trace);
293         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298                         total_elements, free_elements, total_memory);
299 }
300 #endif
301
302 void sighandler(int sig)
303 {
304         switch (sig) {
305                 case SIGTERM:
306                         sigs |= SIGTERM_MASK;
307                         break;
308 #if ((DEBUG) & DEBUG_I)
309                 case SIGUSR1:
310                         sigs |= SIGUSR1_MASK;
311                         break;
312 #endif
313         }
314 }
315
316 void gettime(struct Time *now)
317 {
318         struct timeval t;
319
320         gettimeofday(&t, 0);
321         now->sec = t.tv_sec;
322         now->usec = t.tv_usec;
323 }
324
325
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
327 {
328         return (t1->sec - t2->sec)/60;
329 }
330
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
332 {
333         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
334 }
335
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
338 {
339         /* Maximum uptime is about 49/2 days */
340         return cmpmtime(t, &start_time);
341 }
342
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
345 {
346         /* Maximum uptime is about 49/2 days */
347         return cmpMtime(t, &start_time);
348 }
349
350 hash_t hash_flow(struct Flow *flow)
351 {
352         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353         else return hash(flow, sizeof(struct Flow_TL));
354 }
355
356 uint16_t snmp_index(char *name) {
357         uint32_t i;
358
359         if (!*name) return 0;
360
361         for (i = 0; (int) i < nsnmp_rules; i++) {
362                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
364         }
365
366         if ((i = if_nametoindex(name))) return i;
367
368         return -1;
369 }
370
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
372 {
373         dst->iif = src->iif;
374         dst->oif = src->oif;
375         dst->sip = src->sip;
376         dst->dip = src->dip;
377         dst->tos = src->tos;
378         dst->xid = src->xid;
379         dst->proto = src->proto;
380         dst->tcp_flags = src->tcp_flags;
381         dst->id = src->id;
382         dst->sp = src->sp;
383         dst->dp = src->dp;
384         dst->pkts = src->pkts;
385         dst->size = src->size;
386         dst->sizeF = src->sizeF;
387         dst->sizeP = src->sizeP;
388         dst->ctime = src->ctime;
389         dst->mtime = src->mtime;
390         dst->flags = src->flags;
391 }
392
393 void read_cur_epoch() {
394         int fd;
395         /* Reset to -1 in case the read fails */
396         cur_epoch=-1;
397         fd = open(LAST_EPOCH_FILE, O_RDONLY);
398         if (fd != -1) {
399                 char snum[MAX_EPOCH_SIZE];
400                 ssize_t len;
401                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
402                 if (len != -1) {
403                         snum[len]='\0';
404                         sscanf(snum,"%d",&cur_epoch);
405                         cur_epoch++; /* Let's not stone the last epoch */
406                         close(fd);
407                 }
408         }
409         return;
410 }
411
412
413 /* Dumps the current epoch in a file to cope with
414  * reboots and killings of fprobe */
415
416 void update_cur_epoch_file(int n) {
417         int fd, len;
418         char snum[MAX_EPOCH_SIZE];
419         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420         fd = open(LAST_EPOCH_FILE, O_WRONLY|O_CREAT|O_TRUNC,O_RDWR|O_CREAT|O_TRUNC,S_IRWXU|S_IRGRP|S_IROTH);
421         if (fd == -1) {
422                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
423                 return;
424         }
425         write(fd, snum, len);
426         close(fd);
427 }
428
429 /* Get the file descriptor corresponding to the current file.
430  * The kludgy implementation is to abstract away the 'current
431  * file descriptor', which may also be a socket. 
432  */
433
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
435         struct Time now;
436         unsigned cur_uptime;
437
438         struct statfs statfs;
439         int ret_fd;
440
441         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442          * doesn't solve the problem */
443         gettime(&now);
444         cur_uptime = getuptime_minutes(&now);
445
446         if (cur_fd != START_DATA_FD) {
447                 if (fstatfs(cur_fd, &statfs) == -1) {
448                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
449                 }
450                 else {
451                         if (min_free && (statfs.f_bavail < min_free)
452                                         && (cur_epoch==last_peak))
453                         {
454                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
455                                 cur_epoch = -1;
456                         }
457                         /*
458                            else 
459                            assume that we can reclaim space by overwriting our own files
460                            and that the difference in size will not fill the disk - sapan
461                            */
462                 }
463         }
464
465         /* If epoch length has been exceeded, 
466          * or we're starting up 
467          * or we're going back to the first epoch */
468         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469                 char nextname[MAX_PATH_LEN];
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0)
475                         close(cur_fd);
476                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
478                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
479                         exit(1);
480                 }
481                 update_cur_epoch_file(cur_epoch);
482                 ret_fd = write_fd;
483         }
484         else {
485                 ret_fd = cur_fd;
486         }
487         return(ret_fd);
488 }
489
490 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
491 {
492         struct Flow **flowpp;
493
494 #ifdef WALL
495         flowpp = 0;
496 #endif
497
498         if (prev) flowpp = *prev;
499
500         while (where) {
501                 if (where->sip.s_addr == what->sip.s_addr
502                                 && where->dip.s_addr == what->dip.s_addr
503                                 && where->proto == what->proto) {
504                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
505                                 case 0:
506                                         /* Both unfragmented */
507                                         if ((what->sp == where->sp)
508                                                         && (what->dp == where->dp)) goto done;
509                                         break;
510                                 case 2:
511                                         /* Both fragmented */
512                                         if (where->id == what->id) goto done;
513                                         break;
514                         }
515                 }
516                 flowpp = &where->next;
517                 where = where->next;
518         }
519 done:
520         if (prev) *prev = flowpp;
521         return where;
522 }
523
524         int put_into(struct Flow *flow, int flag
525 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
526                         , char *logbuf
527 #endif
528                     )
529 {
530         int ret = 0;
531         hash_t h;
532         struct Flow *flown, **flowpp;
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
534         char buf[64];
535 #endif
536
537         h = hash_flow(flow);
538 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
539         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
540         strcat(logbuf, buf);
541 #endif
542         pthread_mutex_lock(&flows_mutex[h]);
543         flowpp = &flows[h];
544         if (!(flown = find(flows[h], flow, &flowpp))) {
545                 /* No suitable flow found - add */
546                 if (flag == COPY_INTO) {
547                         if ((flown = mem_alloc())) {
548                                 copy_flow(flow, flown);
549                                 flow = flown;
550                         } else {
551 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
552                                 my_log(LOG_ERR, "%s %s. %s",
553                                                 "mem_alloc():", strerror(errno), "packet lost");
554 #endif
555                                 return -1;
556                         }
557                 }
558                 flow->next = flows[h];
559                 flows[h] = flow;
560 #if ((DEBUG) & DEBUG_I)
561                 flows_total++;
562                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
563 #endif
564 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
565                 if (flown) {
566                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
567                         strcat(logbuf, buf);
568                 }
569 #endif
570         } else {
571                 /* Found suitable flow - update */
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
573                 sprintf(buf, " +> %x", (unsigned) flown);
574                 strcat(logbuf, buf);
575 #endif
576                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
577                         flown->mtime = flow->mtime;
578                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
579                         flown->ctime = flow->ctime;
580                 flown->tcp_flags |= flow->tcp_flags;
581                 flown->size += flow->size;
582                 flown->pkts += flow->pkts;
583                 if (flow->flags & FLOW_FRAG) {
584                         /* Fragmented flow require some additional work */
585                         if (flow->flags & FLOW_TL) {
586                                 /*
587                                    ?FIXME?
588                                    Several packets with FLOW_TL (attack)
589                                    */
590                                 flown->sp = flow->sp;
591                                 flown->dp = flow->dp;
592                         }
593                         if (flow->flags & FLOW_LASTFRAG) {
594                                 /*
595                                    ?FIXME?
596                                    Several packets with FLOW_LASTFRAG (attack)
597                                    */
598                                 flown->sizeP = flow->sizeP;
599                         }
600                         flown->flags |= flow->flags;
601                         flown->sizeF += flow->sizeF;
602                         if ((flown->flags & FLOW_LASTFRAG)
603                                         && (flown->sizeF >= flown->sizeP)) {
604                                 /* All fragments received - flow reassembled */
605                                 *flowpp = flown->next;
606                                 pthread_mutex_unlock(&flows_mutex[h]);
607 #if ((DEBUG) & DEBUG_I)
608                                 flows_total--;
609                                 flows_fragmented--;
610 #endif
611                                 flown->id = 0;
612                                 flown->flags &= ~FLOW_FRAG;
613 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
614                                 strcat(logbuf," R");
615 #endif
616                                 ret = put_into(flown, MOVE_INTO
617 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
618                                                 , logbuf
619 #endif
620                                               );
621                         }
622                 }
623                 if (flag == MOVE_INTO) mem_free(flow);
624         }
625         pthread_mutex_unlock(&flows_mutex[h]);
626         return ret;
627 }
628
629 int onlyonce=0;
630
631 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
632 {
633         int i;
634
635         for (i = 0; i < fields; i++) {
636 #if ((DEBUG) & DEBUG_F)
637                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
638 #endif
639                 switch (format[i]) {
640                         case NETFLOW_IPV4_SRC_ADDR:
641                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
642                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
643                                 break;
644
645                         case NETFLOW_IPV4_DST_ADDR:
646                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
647                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
648                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
649                                 }
650                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
651                                 break;
652
653                         case NETFLOW_INPUT_SNMP:
654                                 *((uint16_t *) p) = htons(flow->iif);
655                                 p += NETFLOW_INPUT_SNMP_SIZE;
656                                 break;
657
658                         case NETFLOW_OUTPUT_SNMP:
659                                 *((uint16_t *) p) = htons(flow->oif);
660                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
661                                 break;
662
663                         case NETFLOW_PKTS_32:
664                                 *((uint32_t *) p) = htonl(flow->pkts);
665                                 p += NETFLOW_PKTS_32_SIZE;
666                                 break;
667
668                         case NETFLOW_BYTES_32:
669                                 *((uint32_t *) p) = htonl(flow->size);
670                                 p += NETFLOW_BYTES_32_SIZE;
671                                 break;
672
673                         case NETFLOW_FIRST_SWITCHED:
674                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
675                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
676                                 break;
677
678                         case NETFLOW_LAST_SWITCHED:
679                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
680                                 p += NETFLOW_LAST_SWITCHED_SIZE;
681                                 break;
682
683                         case NETFLOW_L4_SRC_PORT:
684                                 *((uint16_t *) p) = flow->sp;
685                                 p += NETFLOW_L4_SRC_PORT_SIZE;
686                                 break;
687
688                         case NETFLOW_L4_DST_PORT:
689                                 *((uint16_t *) p) = flow->dp;
690                                 p += NETFLOW_L4_DST_PORT_SIZE;
691                                 break;
692
693                         case NETFLOW_PROT:
694                                 *((uint8_t *) p) = flow->proto;
695                                 p += NETFLOW_PROT_SIZE;
696                                 break;
697
698                         case NETFLOW_SRC_TOS:
699                                 *((uint8_t *) p) = flow->tos;
700                                 p += NETFLOW_SRC_TOS_SIZE;
701                                 break;
702
703                         case NETFLOW_TCP_FLAGS:
704                                 *((uint8_t *) p) = flow->tcp_flags;
705                                 p += NETFLOW_TCP_FLAGS_SIZE;
706                                 break;
707
708                         case NETFLOW_VERSION:
709                                 *((uint16_t *) p) = htons(netflow->Version);
710                                 p += NETFLOW_VERSION_SIZE;
711                                 break;
712
713                         case NETFLOW_COUNT:
714                                 *((uint16_t *) p) = htons(emit_count);
715                                 p += NETFLOW_COUNT_SIZE;
716                                 break;
717
718                         case NETFLOW_UPTIME:
719                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
720                                 p += NETFLOW_UPTIME_SIZE;
721                                 break;
722
723                         case NETFLOW_UNIX_SECS:
724                                 *((uint32_t *) p) = htonl(emit_time.sec);
725                                 p += NETFLOW_UNIX_SECS_SIZE;
726                                 break;
727
728                         case NETFLOW_UNIX_NSECS:
729                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
730                                 p += NETFLOW_UNIX_NSECS_SIZE;
731                                 break;
732
733                         case NETFLOW_FLOW_SEQUENCE:
734                                 //*((uint32_t *) p) = htonl(emit_sequence);
735                                 *((uint32_t *) p) = 0;
736                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
737                                 break;
738
739                         case NETFLOW_PAD8:
740                                 /* Unsupported (uint8_t) */
741                         case NETFLOW_ENGINE_TYPE:
742                         case NETFLOW_ENGINE_ID:
743                         case NETFLOW_FLAGS7_1:
744                         case NETFLOW_SRC_MASK:
745                         case NETFLOW_DST_MASK:
746                                 if (onlyonce) {
747                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
748                                         onlyonce=1;
749                                 }
750                                 *((uint8_t *) p) = 0;
751                                 p += NETFLOW_PAD8_SIZE;
752                                 break;
753                         case NETFLOW_XID:
754                                 *((uint32_t *) p) = flow->xid;
755                                 p += NETFLOW_XID_SIZE;
756                                 break;
757                         case NETFLOW_PAD16:
758                                 /* Unsupported (uint16_t) */
759                         case NETFLOW_SRC_AS:
760                         case NETFLOW_DST_AS:
761                         case NETFLOW_FLAGS7_2:
762                                 *((uint16_t *) p) = 0;
763                                 p += NETFLOW_PAD16_SIZE;
764                                 break;
765
766                         case NETFLOW_PAD32:
767                                 /* Unsupported (uint32_t) */
768                         case NETFLOW_IPV4_NEXT_HOP:
769                         case NETFLOW_ROUTER_SC:
770                                 *((uint32_t *) p) = 0;
771                                 p += NETFLOW_PAD32_SIZE;
772                                 break;
773
774                         default:
775                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
776                                                 format, i, format[i]);
777                                 exit(1);
778                 }
779         }
780 #if ((DEBUG) & DEBUG_F)
781         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
782 #endif
783         return p;
784 }
785
786 void setuser() {
787         /*
788            Workaround for clone()-based threads
789            Try to change EUID independently of main thread
790            */
791         if (pw) {
792                 setgroups(0, NULL);
793                 setregid(pw->pw_gid, pw->pw_gid);
794                 setreuid(pw->pw_uid, pw->pw_uid);
795         }
796 }
797
798 void *emit_thread()
799 {
800         struct Flow *flow;
801         void *p;
802         struct timeval now;
803         struct timespec timeout;
804         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
805
806         p = (void *) &emit_packet + netflow->HeaderSize;
807         timeout.tv_nsec = 0;
808
809         setuser();
810
811         for (;;) {
812                 pthread_mutex_lock(&emit_mutex);
813                 while (!flows_emit) {
814                         gettimeofday(&now, 0);
815                         timeout.tv_sec = now.tv_sec + emit_timeout;
816                         /* Do not wait until emit_packet will filled - it may be too long */
817                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
818                                 pthread_mutex_unlock(&emit_mutex);
819                                 goto sendit;
820                         }
821                 }
822                 flow = flows_emit;
823                 flows_emit = flows_emit->next;
824 #if ((DEBUG) & DEBUG_I)
825                 emit_queue--;
826 #endif          
827                 pthread_mutex_unlock(&emit_mutex);
828
829 #ifdef UPTIME_TRICK
830                 if (!emit_count) {
831                         gettime(&start_time);
832                         start_time.sec -= start_time_offset;
833                 }
834 #endif
835                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
836                 mem_free(flow);
837                 emit_count++;
838 #ifdef PF2_DEBUG
839                 printf("Emit count = %d\n", emit_count);
840                 fflush(stdout);
841 #endif
842                 if (emit_count == netflow->MaxFlows) {
843 sendit:
844                         gettime(&emit_time);
845                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
846                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
847                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
848 #ifdef STD_NETFLOW_PDU
849                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
850 #endif
851                         peer_rot_cur = 0;
852                         for (i = 0; i < npeers; i++) {
853                                 if (peers[i].type == PEER_FILE) {
854                                         if (netflow->SeqOffset)
855                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
856                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
857                                         ret = write(peers[i].write_fd, emit_packet, size);
858                                         if (ret < size) {
859
860 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
861                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
862                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
863 #endif
864 #undef MESSAGES
865                                         }
866 #if ((DEBUG) & DEBUG_E)
867                                         commaneelse {
868                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
869                                                                 emit_count, i + 1, peers[i].seq);
870                                         }
871 #endif
872                                         peers[i].seq += emit_count;
873
874                                         /* Rate limit */
875                                         if (emit_rate_bytes) {
876                                                 sent += size;
877                                                 delay = sent / emit_rate_bytes;
878                                                 if (delay) {
879                                                         sent %= emit_rate_bytes;
880                                                         timeout.tv_sec = 0;
881                                                         timeout.tv_nsec = emit_rate_delay * delay;
882                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
883                                                 }
884                                         }
885                                 }
886                                 else
887                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
888                                         else
889                                                 if (peers[i].type == PEER_ROTATE) 
890                                                         if (peer_rot_cur++ == peer_rot_work) {
891 sendreal:
892                                                                 if (netflow->SeqOffset)
893                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
894                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
895                                                                 if (ret < size) {
896 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
897                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
898                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
899 #endif
900                                                                 }
901 #if ((DEBUG) & DEBUG_E)
902                                                                 commaneelse {
903                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
904                                                                                         emit_count, i + 1, peers[i].seq);
905                                                                 }
906 #endif
907                                                                 peers[i].seq += emit_count;
908
909                                                                 /* Rate limit */
910                                                                 if (emit_rate_bytes) {
911                                                                         sent += size;
912                                                                         delay = sent / emit_rate_bytes;
913                                                                         if (delay) {
914                                                                                 sent %= emit_rate_bytes;
915                                                                                 timeout.tv_sec = 0;
916                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
917                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
918                                                                         }
919                                                                 }
920                                                         }
921                         }
922                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
923                         emit_sequence += emit_count;
924                         emit_count = 0;
925 #if ((DEBUG) & DEBUG_I)
926                         emit_pkts++;
927 #endif
928                 }
929         }
930 }       
931
932 void *unpending_thread()
933 {
934         struct timeval now;
935         struct timespec timeout;
936 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
937         char logbuf[256];
938 #endif
939
940         setuser();
941
942         timeout.tv_nsec = 0;
943         pthread_mutex_lock(&unpending_mutex);
944
945         for (;;) {
946                 while (!(pending_tail->flags & FLOW_PENDING)) {
947                         gettimeofday(&now, 0);
948                         timeout.tv_sec = now.tv_sec + unpending_timeout;
949                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
950                 }
951
952 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
953                 *logbuf = 0;
954 #endif
955                 if (put_into(pending_tail, COPY_INTO
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
957                                         , logbuf
958 #endif
959                             ) < 0) {
960 #if ((DEBUG) & DEBUG_I)
961                         pkts_lost_unpending++;
962 #endif                          
963                 }
964
965 #if ((DEBUG) & DEBUG_U)
966                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
967 #endif
968
969                 pending_tail->flags = 0;
970                 pending_tail = pending_tail->next;
971 #if ((DEBUG) & DEBUG_I)
972                 pkts_pending_done++;
973 #endif
974         }
975 }
976
977 void *scan_thread()
978 {
979 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
980         char logbuf[256];
981 #endif
982         int i;
983         struct Flow *flow, **flowpp;
984         struct Time now;
985         struct timespec timeout;
986
987         setuser();
988
989         timeout.tv_nsec = 0;
990         pthread_mutex_lock(&scan_mutex);
991
992         for (;;) {
993                 gettime(&now);
994                 timeout.tv_sec = now.sec + scan_interval;
995                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
996
997                 gettime(&now);
998 #if ((DEBUG) & DEBUG_S)
999                 my_log(LOG_DEBUG, "S: %d", now.sec);
1000 #endif
1001                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1002                         pthread_mutex_lock(&flows_mutex[i]);
1003                         flow = flows[i];
1004                         flowpp = &flows[i];
1005                         while (flow) {
1006                                 if (flow->flags & FLOW_FRAG) {
1007                                         /* Process fragmented flow */
1008                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1009                                                 /* Fragmented flow expired - put it into special chain */
1010 #if ((DEBUG) & DEBUG_I)
1011                                                 flows_fragmented--;
1012                                                 flows_total--;
1013 #endif
1014                                                 *flowpp = flow->next;
1015                                                 flow->id = 0;
1016                                                 flow->flags &= ~FLOW_FRAG;
1017                                                 flow->next = scan_frag_dreg;
1018                                                 scan_frag_dreg = flow;
1019                                                 flow = *flowpp;
1020                                                 continue;
1021                                         }
1022                                 } else {
1023                                         /* Flow is not frgamented */
1024                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1025                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1026                                                 /* Flow expired */
1027 #if ((DEBUG) & DEBUG_S)
1028                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1029 #endif
1030 #if ((DEBUG) & DEBUG_I)
1031                                                 flows_total--;
1032 #endif
1033                                                 *flowpp = flow->next;
1034                                                 pthread_mutex_lock(&emit_mutex);
1035                                                 flow->next = flows_emit;
1036                                                 flows_emit = flow;
1037 #if ((DEBUG) & DEBUG_I)
1038                                                 emit_queue++;
1039 #endif                          
1040                                                 pthread_mutex_unlock(&emit_mutex);
1041                                                 flow = *flowpp;
1042                                                 continue;
1043                                         }
1044                                 }
1045                                 flowpp = &flow->next;
1046                                 flow = flow->next;
1047                         } /* chain loop */
1048                         pthread_mutex_unlock(&flows_mutex[i]);
1049                 } /* hash loop */
1050                 if (flows_emit) pthread_cond_signal(&emit_cond);
1051
1052                 while (scan_frag_dreg) {
1053                         flow = scan_frag_dreg;
1054                         scan_frag_dreg = flow->next;
1055 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1056                         *logbuf = 0;
1057 #endif
1058                         put_into(flow, MOVE_INTO
1059 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1060                                         , logbuf
1061 #endif
1062                                 );
1063 #if ((DEBUG) & DEBUG_S)
1064                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1065 #endif
1066                 }
1067         }
1068 }
1069
1070 void *cap_thread()
1071 {
1072         struct ulog_packet_msg *ulog_msg;
1073         struct ip *nl;
1074         void *tl;
1075         struct Flow *flow;
1076         int len, off_frag, psize;
1077 #if ((DEBUG) & DEBUG_C)
1078         char buf[64];
1079         char logbuf[256];
1080 #endif
1081         int challenge;
1082
1083         setuser();
1084
1085         while (!killed) {
1086                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1087                 if (len <= 0) {
1088                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1089                         continue;
1090                 }
1091                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1092
1093 #if ((DEBUG) & DEBUG_C)
1094                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1095 #endif
1096
1097                         nl = (void *) &ulog_msg->payload;
1098                         psize = ulog_msg->data_len;
1099
1100                         /* Sanity check */
1101                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1102 #if ((DEBUG) & DEBUG_C)
1103                                 strcat(logbuf, " U");
1104                                 my_log(LOG_DEBUG, "%s", logbuf);
1105 #endif
1106 #if ((DEBUG) & DEBUG_I)
1107                                 pkts_ignored++;
1108 #endif
1109                                 continue;
1110                         }
1111
1112                         if (pending_head->flags) {
1113 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1114                                 my_log(LOG_ERR,
1115 # if ((DEBUG) & DEBUG_C)
1116                                                 "%s %s %s", logbuf,
1117 # else
1118                                                 "%s %s",
1119 # endif
1120                                                 "pending queue full:", "packet lost");
1121 #endif
1122 #if ((DEBUG) & DEBUG_I)
1123                                 pkts_lost_capture++;
1124 #endif
1125                                 goto done;
1126                         }
1127
1128 #if ((DEBUG) & DEBUG_I)
1129                         pkts_total++;
1130 #endif
1131
1132                         flow = pending_head;
1133
1134                         /* ?FIXME? Add sanity check for ip_len? */
1135                         flow->size = ntohs(nl->ip_len);
1136 #if ((DEBUG) & DEBUG_I)
1137                         size_total += flow->size;
1138 #endif
1139
1140                         flow->sip = nl->ip_src;
1141                         flow->dip = nl->ip_dst;
1142                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1143                         
1144                         /* It's going to be expensive calling this syscall on every flow.
1145                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1146                         if (ulog_msg->mark > 0) {
1147                                 flow->xid = get_vhi_name(ulog_msg->mark);
1148                                 challenge = get_vhi_name(ulog_msg->mark);
1149                         }
1150
1151                         if (flow->xid < 1 || flow->xid!=challenge) 
1152                                 flow->xid = ulog_msg->mark;
1153
1154
1155                         if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1156                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid);
1157                         }
1158                         flow->iif = snmp_index(ulog_msg->indev_name);
1159                         flow->oif = snmp_index(ulog_msg->outdev_name);
1160                         flow->proto = nl->ip_p;
1161                         flow->id = 0;
1162                         flow->tcp_flags = 0;
1163                         flow->pkts = 1;
1164                         flow->sizeF = 0;
1165                         flow->sizeP = 0;
1166                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1167                         if (ulog_msg->timestamp_sec) {
1168                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1169                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1170                         } else gettime(&flow->ctime);
1171                         flow->mtime = flow->ctime;
1172
1173                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1174
1175                         /*
1176                            Offset (from network layer) to transport layer header/IP data
1177                            IOW IP header size ;-)
1178
1179                            ?FIXME?
1180                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1181                            */
1182                         off_tl = nl->ip_hl << 2;
1183                         tl = (void *) nl + off_tl;
1184
1185                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1186                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1187                         psize -= off_tl;
1188                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1189                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1190
1191                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1192                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1193 #if ((DEBUG) & DEBUG_C)
1194                                 strcat(logbuf, " F");
1195 #endif
1196 #if ((DEBUG) & DEBUG_I)
1197                                 pkts_total_fragmented++;
1198 #endif
1199                                 flow->flags |= FLOW_FRAG;
1200                                 flow->id = nl->ip_id;
1201
1202                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1203                                         /* Packet whith IP_MF contains information about whole datagram size */
1204                                         flow->flags |= FLOW_LASTFRAG;
1205                                         /* size = frag_offset*8 + data_size */
1206                                         flow->sizeP = off_frag + flow->sizeF;
1207                                 }
1208                         }
1209
1210 #if ((DEBUG) & DEBUG_C)
1211                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1212                         strcat(logbuf, buf);
1213                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1214                         strcat(logbuf, buf);
1215 #endif
1216
1217                         /*
1218                            Fortunately most interesting transport layer information fit
1219                            into first 8 bytes of IP data field (minimal nonzero size).
1220                            Thus we don't need actual packet reassembling to build whole
1221                            transport layer data. We only check the fragment offset for
1222                            zero value to find packet with this information.
1223                            */
1224                         if (!off_frag && psize >= 8) {
1225                                 switch (flow->proto) {
1226                                         case IPPROTO_TCP:
1227                                         case IPPROTO_UDP:
1228                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1229                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1230                                                 goto tl_known;
1231
1232 #ifdef ICMP_TRICK
1233                                         case IPPROTO_ICMP:
1234                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1235                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1236                                                 goto tl_known;
1237 #endif
1238 #ifdef ICMP_TRICK_CISCO
1239                                         case IPPROTO_ICMP:
1240                                                 flow->dp = *((int32_t *) tl);
1241                                                 goto tl_known;
1242 #endif
1243
1244                                         default:
1245                                                 /* Unknown transport layer */
1246 #if ((DEBUG) & DEBUG_C)
1247                                                 strcat(logbuf, " U");
1248 #endif
1249                                                 flow->sp = 0;
1250                                                 flow->dp = 0;
1251                                                 break;
1252
1253 tl_known:
1254 #if ((DEBUG) & DEBUG_C)
1255                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1256                                                 strcat(logbuf, buf);
1257 #endif
1258                                                 flow->flags |= FLOW_TL;
1259                                 }
1260                         }
1261
1262                         /* Check for tcp flags presence (including CWR and ECE). */
1263                         if (flow->proto == IPPROTO_TCP
1264                                         && off_frag < 16
1265                                         && psize >= 16 - off_frag) {
1266                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1267 #if ((DEBUG) & DEBUG_C)
1268                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1269                                 strcat(logbuf, buf);
1270 #endif
1271                         }
1272
1273 #if ((DEBUG) & DEBUG_C)
1274                         sprintf(buf, " => %x", (unsigned) flow);
1275                         strcat(logbuf, buf);
1276                         my_log(LOG_DEBUG, "%s", logbuf);
1277 #endif
1278
1279 #if ((DEBUG) & DEBUG_I)
1280                         pkts_pending++;
1281                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1282                         if (pending_queue_trace < pending_queue_trace_candidate)
1283                                 pending_queue_trace = pending_queue_trace_candidate;
1284 #endif
1285
1286                         /* Flow complete - inform unpending_thread() about it */
1287                         pending_head->flags |= FLOW_PENDING;
1288                         pending_head = pending_head->next;
1289 done:
1290                         pthread_cond_signal(&unpending_cond);
1291                 }
1292         }
1293         return 0;
1294 }
1295
1296 /* Copied out of CoDemux */
1297
1298 static int init_daemon() {
1299         pid_t pid;
1300         FILE *pidfile;
1301
1302         pidfile = fopen(PIDFILE, "w");
1303         if (pidfile == NULL) {
1304                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1305         }
1306
1307         if ((pid = fork()) < 0) {
1308                 fclose(pidfile);
1309                 my_log(LOG_ERR, "Could not fork!\n");
1310                 return(-1);
1311         }
1312         else if (pid != 0) {
1313                 /* i'm the parent, writing down the child pid  */
1314                 fprintf(pidfile, "%u\n", pid);
1315                 fclose(pidfile);
1316                 exit(0);
1317         }
1318
1319         /* close the pid file */
1320         fclose(pidfile);
1321
1322         /* routines for any daemon process
1323            1. create a new session 
1324            2. change directory to the root
1325            3. change the file creation permission 
1326            */
1327         setsid();
1328         chdir("/var/local/fprobe");
1329         umask(0);
1330
1331         return(0);
1332 }
1333
1334 int main(int argc, char **argv)
1335 {
1336         char errpbuf[512];
1337         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1338         int c, i, write_fd, memory_limit = 0;
1339         struct addrinfo hints, *res;
1340         struct sockaddr_in saddr;
1341         pthread_attr_t tattr;
1342         struct sigaction sigact;
1343         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1344         struct timeval timeout;
1345
1346         sched_min = sched_get_priority_min(SCHED);
1347         sched_max = sched_get_priority_max(SCHED);
1348
1349         memset(&saddr, 0 , sizeof(saddr));
1350         memset(&hints, 0 , sizeof(hints));
1351         hints.ai_flags = AI_PASSIVE;
1352         hints.ai_family = AF_INET;
1353         hints.ai_socktype = SOCK_DGRAM;
1354
1355         /* Process command line options */
1356
1357         opterr = 0;
1358         while ((c = my_getopt(argc, argv, parms)) != -1) {
1359                 switch (c) {
1360                         case '?':
1361                                 usage();
1362
1363                         case 'h':
1364                                 usage();
1365                 }
1366         }
1367
1368         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1369         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1370         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1371         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1372         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1373         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1374         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1375         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1376         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1377         if (parms[nflag].count) {
1378                 switch (atoi(parms[nflag].arg)) {
1379                         case 1:
1380                                 netflow = &NetFlow1;
1381                                 break;
1382
1383                         case 5:
1384                                 break;
1385
1386                         case 7:
1387                                 netflow = &NetFlow7;
1388                                 break;
1389
1390                         default:
1391                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1392                                 exit(1);
1393                 }
1394         }
1395         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1396         if (parms[lflag].count) {
1397                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1398                         *log_suffix++ = 0;
1399                         if (*log_suffix) {
1400                                 sprintf(errpbuf, "[%s]", log_suffix);
1401                                 strcat(ident, errpbuf);
1402                         }
1403                 }
1404                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1405                 if (log_suffix) *--log_suffix = ':';
1406         }
1407         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1408 err_malloc:
1409                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1410                 exit(1);
1411         }
1412         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1413         if (parms[qflag].count) {
1414                 pending_queue_length = atoi(parms[qflag].arg);
1415                 if (pending_queue_length < 1) {
1416                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1417                         exit(1);
1418                 }
1419         }
1420         if (parms[rflag].count) {
1421                 schedp.sched_priority = atoi(parms[rflag].arg);
1422                 if (schedp.sched_priority
1423                                 && (schedp.sched_priority < sched_min
1424                                         || schedp.sched_priority > sched_max)) {
1425                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1426                         exit(1);
1427                 }
1428         }
1429         if (parms[Bflag].count) {
1430                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1431         }
1432         if (parms[bflag].count) {
1433                 bulk_quantity = atoi(parms[bflag].arg);
1434                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1435                         fprintf(stderr, "Illegal %s\n", "bulk size");
1436                         exit(1);
1437                 }
1438         }
1439         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1440         if (parms[Xflag].count) {
1441                 for(i = 0; parms[Xflag].arg[i]; i++)
1442                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1443                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1444                         goto err_malloc;
1445                 rule = strtok(parms[Xflag].arg, ":");
1446                 for (i = 0; rule; i++) {
1447                         snmp_rules[i].len = strlen(rule);
1448                         if (snmp_rules[i].len > IFNAMSIZ) {
1449                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1450                                 exit(1);
1451                         }
1452                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1453                         if (!*(rule - 1)) *(rule - 1) = ',';
1454                         rule = strtok(NULL, ",");
1455                         if (!rule) {
1456                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1457                                 exit(1);
1458                         }
1459                         snmp_rules[i].base = atoi(rule);
1460                         *(rule - 1) = ':';
1461                         rule = strtok(NULL, ":");
1462                 }
1463                 nsnmp_rules = i;
1464         }
1465         if (parms[tflag].count)
1466                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1467         if (parms[aflag].count) {
1468                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1469 bad_lhost:
1470                         fprintf(stderr, "Illegal %s\n", "source address");
1471                         exit(1);
1472                 } else {
1473                         saddr = *((struct sockaddr_in *) res->ai_addr);
1474                         freeaddrinfo(res);
1475                 }
1476         }
1477         if (parms[uflag].count) 
1478                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1479                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1480                         exit(1);
1481                 }
1482
1483
1484         /* Process collectors parameters. Brrrr... :-[ */
1485
1486         npeers = argc - optind;
1487         if (npeers > 1) {
1488                 /* Send to remote Netflow collector */
1489                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1490                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1491                         dhost = argv[i];
1492                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1493                         *dport++ = 0;
1494                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1495                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1496                                 exit(1);
1497                         }
1498                         peers[npeers].write_fd = write_fd;
1499                         peers[npeers].type = PEER_MIRROR;
1500                         peers[npeers].laddr = saddr;
1501                         peers[npeers].seq = 0;
1502                         if ((lhost = strchr(dport, '/'))) {
1503                                 *lhost++ = 0;
1504                                 if ((type = strchr(lhost, '/'))) {
1505                                         *type++ = 0;
1506                                         switch (*type) {
1507                                                 case 0:
1508                                                 case 'm':
1509                                                         break;
1510
1511                                                 case 'r':
1512                                                         peers[npeers].type = PEER_ROTATE;
1513                                                         npeers_rot++;
1514                                                         break;
1515
1516                                                 default:
1517                                                         goto bad_collector;
1518                                         }
1519                                 }
1520                                 if (*lhost) {
1521                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1522                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1523                                         freeaddrinfo(res);
1524                                 }
1525                         }
1526                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1527                                                 sizeof(struct sockaddr_in))) {
1528                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1529                                 exit(1);
1530                         }
1531                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1532 bad_collector:
1533                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1534                                 exit(1);
1535                         }
1536                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1537                         freeaddrinfo(res);
1538                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1539                                                 sizeof(struct sockaddr_in))) {
1540                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1541                                 exit(1);
1542                         }
1543
1544                         /* Restore command line */
1545                         if (type) *--type = '/';
1546                         if (lhost) *--lhost = '/';
1547                         *--dport = ':';
1548                 }
1549         }
1550         else if (parms[fflag].count) {
1551                 // log into a file
1552                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1553                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1554                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1555
1556                 peers[npeers].write_fd = START_DATA_FD;
1557                 peers[npeers].type = PEER_FILE;
1558                 peers[npeers].seq = 0;
1559
1560                 read_cur_epoch();
1561                 npeers++;
1562         }
1563         else 
1564                 usage();
1565
1566
1567         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1568         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1569         if (!ulog_handle) {
1570                 fprintf(stderr, "libipulog initialization error: %s",
1571                                 ipulog_strerror(ipulog_errno));
1572                 exit(1);
1573         }
1574         if (sockbufsize)
1575                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1576                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1577                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1578
1579         /* Daemonize (if log destination stdout-free) */
1580
1581         my_log_open(ident, verbosity, log_dest);
1582
1583         init_daemon();
1584
1585         if (!(log_dest & 2)) {
1586                 /* Crash-proofing - Sapan*/
1587                 while (1) {
1588                         int pid=fork();
1589                         if (pid==-1) {
1590                                 fprintf(stderr, "fork(): %s", strerror(errno));
1591                                 exit(1);
1592                         }
1593                         else if (pid==0) {
1594                                 setsid();
1595                                 freopen("/dev/null", "r", stdin);
1596                                 freopen("/dev/null", "w", stdout);
1597                                 freopen("/dev/null", "w", stderr);
1598                                 break;
1599                         }
1600                         else {
1601                                 while (wait3(NULL,0,NULL) < 1);
1602                         }
1603                 }
1604         } else {
1605                 setvbuf(stdout, (char *)0, _IONBF, 0);
1606                 setvbuf(stderr, (char *)0, _IONBF, 0);
1607         }
1608
1609         pid = getpid();
1610         sprintf(errpbuf, "[%ld]", (long) pid);
1611         strcat(ident, errpbuf);
1612
1613         /* Initialization */
1614
1615         hash_init(); /* Actually for crc16 only */
1616         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1617         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1618
1619 #ifdef UPTIME_TRICK
1620         /* Hope 12 days is enough :-/ */
1621         start_time_offset = 1 << 20;
1622
1623         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1624 #endif
1625         gettime(&start_time);
1626
1627         /*
1628            Build static pending queue as circular buffer.
1629            */
1630         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1631         pending_tail = pending_head;
1632         for (i = pending_queue_length - 1; i--;) {
1633                 if (!(pending_tail->next = mem_alloc())) {
1634 err_mem_alloc:
1635                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1636                         exit(1);
1637                 }
1638                 pending_tail = pending_tail->next;
1639         }
1640         pending_tail->next = pending_head;
1641         pending_tail = pending_head;
1642
1643         sigemptyset(&sig_mask);
1644         sigact.sa_handler = &sighandler;
1645         sigact.sa_mask = sig_mask;
1646         sigact.sa_flags = 0;
1647         sigaddset(&sig_mask, SIGTERM);
1648         sigaction(SIGTERM, &sigact, 0);
1649 #if ((DEBUG) & DEBUG_I)
1650         sigaddset(&sig_mask, SIGUSR1);
1651         sigaction(SIGUSR1, &sigact, 0);
1652 #endif
1653         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1654                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1655                 exit(1);
1656         }
1657
1658         my_log(LOG_INFO, "Starting %s...", VERSION);
1659
1660         if (parms[cflag].count) {
1661                 if (chdir(parms[cflag].arg) || chroot(".")) {
1662                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1663                         exit(1);
1664                 }
1665         }
1666
1667         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1668         pthread_attr_init(&tattr);
1669         for (i = 0; i < THREADS - 1; i++) {
1670                 if (schedp.sched_priority > 0) {
1671                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1672                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1673                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1674                                 exit(1);
1675                         }
1676                 }
1677                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1678                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1679                         exit(1);
1680                 }
1681                 pthread_detach(thid);
1682                 schedp.sched_priority++;
1683         }
1684
1685         if (pw) {
1686                 if (setgroups(0, NULL)) {
1687                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1688                         exit(1);
1689                 }
1690                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1691                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1692                         exit(1);
1693                 }
1694                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1695                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1696                         exit(1);
1697                 }
1698         }
1699
1700         if (!(pidfile = fopen(pidfilepath, "w")))
1701                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1702         else {
1703                 fprintf(pidfile, "%ld\n", (long) pid);
1704                 fclose(pidfile);
1705         }
1706
1707         my_log(LOG_INFO, "pid: %d", pid);
1708         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1709                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1710                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1711                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1712                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1713                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1714                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1715         for (i = 0; i < nsnmp_rules; i++) {
1716                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1717                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1718         }
1719         for (i = 0; i < npeers; i++) {
1720                 switch (peers[i].type) {
1721                         case PEER_MIRROR:
1722                                 c = 'm';
1723                                 break;
1724                         case PEER_ROTATE:
1725                                 c = 'r';
1726                                 break;
1727                 }
1728                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1729                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1730                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1731         }
1732
1733         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1734
1735         timeout.tv_usec = 0;
1736         while (!killed
1737                         || (total_elements - free_elements - pending_queue_length)
1738                         || emit_count
1739                         || pending_tail->flags) {
1740
1741                 if (!sigs) {
1742                         timeout.tv_sec = scan_interval;
1743                         select(0, 0, 0, 0, &timeout);
1744                 }
1745
1746                 if (sigs & SIGTERM_MASK && !killed) {
1747                         sigs &= ~SIGTERM_MASK;
1748                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1749                         scan_interval = 1;
1750                         frag_lifetime = -1;
1751                         active_lifetime = -1;
1752                         inactive_lifetime = -1;
1753                         emit_timeout = 1;
1754                         unpending_timeout = 1;
1755                         killed = 1;
1756                         pthread_cond_signal(&scan_cond);
1757                         pthread_cond_signal(&unpending_cond);
1758                 }
1759
1760 #if ((DEBUG) & DEBUG_I)
1761                 if (sigs & SIGUSR1_MASK) {
1762                         sigs &= ~SIGUSR1_MASK;
1763                         info_debug();
1764                 }
1765 #endif
1766         }
1767         remove(pidfilepath);
1768 #if ((DEBUG) & DEBUG_I)
1769         info_debug();
1770 #endif
1771         my_log(LOG_INFO, "Done.");
1772 #ifdef WALL
1773         return 0;
1774 #endif
1775 }