xid-> slice id mapping patch
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 #include "vserver.h"
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
223
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
231
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
235
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
239 static pid_t pid;
240 static int killed;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
248
249 void usage()
250 {
251         fprintf(stdout,
252                         "fprobe-ulog: a NetFlow probe. Version %s\n"
253                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
254                         "\n"
255                         "-h\t\tDisplay this help\n"
256                         "-U <mask>\tULOG group bitwise mask [1]\n"
257                         "-s <seconds>\tHow often scan for expired flows [5]\n"
258                         "-g <seconds>\tFragmented flow lifetime [30]\n"
259                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260                         "-f <filename>\tLog flow data in a file\n"
261                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263                         "-a <address>\tUse <address> as source for NetFlow flow\n"
264                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265                         "-M\t\tUse netfilter mark value as ToS flag\n"
266                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268                         "-q <flows>\tPending queue length [100]\n"
269                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
270                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272                         "-c <directory>\tDirectory to chroot to\n"
273                         "-u <user>\tUser to run as\n"
274                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276                         "-y <remote:port>\tAddress of the NetFlow collector\n"
277                         "-f <writable file>\tFile to write data into\n"
278                         "-T <n>\tRotate log file every n epochs\n"
279                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280                         "-E <[1..60]>\tSize of an epoch in minutes\n"
281                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
282                         ,
283                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
284         exit(0);
285 }
286
287 #if ((DEBUG) & DEBUG_I)
288 void info_debug()
289 {
290         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291                         pkts_total, pkts_total_fragmented, size_total,
292                         pkts_pending - pkts_pending_done, pending_queue_trace);
293         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298                         total_elements, free_elements, total_memory);
299 }
300 #endif
301
302 void sighandler(int sig)
303 {
304         switch (sig) {
305                 case SIGTERM:
306                         sigs |= SIGTERM_MASK;
307                         break;
308 #if ((DEBUG) & DEBUG_I)
309                 case SIGUSR1:
310                         sigs |= SIGUSR1_MASK;
311                         break;
312 #endif
313         }
314 }
315
316 void gettime(struct Time *now)
317 {
318         struct timeval t;
319
320         gettimeofday(&t, 0);
321         now->sec = t.tv_sec;
322         now->usec = t.tv_usec;
323 }
324
325
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
327 {
328         return (t1->sec - t2->sec)/60;
329 }
330
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
332 {
333         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
334 }
335
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
338 {
339         /* Maximum uptime is about 49/2 days */
340         return cmpmtime(t, &start_time);
341 }
342
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
345 {
346         /* Maximum uptime is about 49/2 days */
347         return cmpMtime(t, &start_time);
348 }
349
350 hash_t hash_flow(struct Flow *flow)
351 {
352         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353         else return hash(flow, sizeof(struct Flow_TL));
354 }
355
356 uint16_t snmp_index(char *name) {
357         uint32_t i;
358
359         if (!*name) return 0;
360
361         for (i = 0; (int) i < nsnmp_rules; i++) {
362                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
364         }
365
366         if ((i = if_nametoindex(name))) return i;
367
368         return -1;
369 }
370
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
372 {
373         dst->iif = src->iif;
374         dst->oif = src->oif;
375         dst->sip = src->sip;
376         dst->dip = src->dip;
377         dst->tos = src->tos;
378         dst->xid = src->xid;
379         dst->proto = src->proto;
380         dst->tcp_flags = src->tcp_flags;
381         dst->id = src->id;
382         dst->sp = src->sp;
383         dst->dp = src->dp;
384         dst->pkts = src->pkts;
385         dst->size = src->size;
386         dst->sizeF = src->sizeF;
387         dst->sizeP = src->sizeP;
388         dst->ctime = src->ctime;
389         dst->mtime = src->mtime;
390         dst->flags = src->flags;
391 }
392
393 void read_cur_epoch() {
394         int fd;
395         /* Reset to -1 in case the read fails */
396         cur_epoch=-1;
397         fd = open(LAST_EPOCH_FILE, O_RDONLY);
398         if (fd != -1) {
399                 char snum[MAX_EPOCH_SIZE];
400                 ssize_t len;
401                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
402                 if (len != -1) {
403                         snum[len]='\0';
404                         sscanf(snum,"%d",&cur_epoch);
405                         cur_epoch++; /* Let's not stone the last epoch */
406                         close(fd);
407                 }
408         }
409         return;
410 }
411
412
413 /* Dumps the current epoch in a file to cope with
414  * reboots and killings of fprobe */
415
416 void update_cur_epoch_file(int n) {
417         int fd, len;
418         char snum[MAX_EPOCH_SIZE];
419         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420         fd = open(LAST_EPOCH_FILE, O_WRONLY|O_CREAT|O_TRUNC);
421         if (fd == -1) {
422                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
423                 return;
424         }
425         write(fd, snum, len);
426         close(fd);
427 }
428
429 /* Get the file descriptor corresponding to the current file.
430  * The kludgy implementation is to abstract away the 'current
431  * file descriptor', which may also be a socket. 
432  */
433
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
435         struct Time now;
436         unsigned cur_uptime;
437
438         struct statfs statfs;
439         int ret_fd;
440
441         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442          * doesn't solve the problem */
443         gettime(&now);
444         cur_uptime = getuptime_minutes(&now);
445
446         if (cur_fd != START_DATA_FD) {
447                 if (fstatfs(cur_fd, &statfs) == -1) {
448                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
449                 }
450                 else {
451                         if (min_free && (statfs.f_bavail < min_free)
452                                         && (cur_epoch==last_peak))
453                         {
454                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
455                                 cur_epoch = -1;
456                         }
457                         /*
458                            else 
459                            assume that we can reclaim space by overwriting our own files
460                            and that the difference in size will not fill the disk - sapan
461                            */
462                 }
463         }
464
465         /* If epoch length has been exceeded, 
466          * or we're starting up 
467          * or we're going back to the first epoch */
468         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469                 char nextname[MAX_PATH_LEN];
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0)
475                         close(cur_fd);
476                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
478                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
479                         exit(1);
480                 }
481                 update_cur_epoch_file(cur_epoch);
482                 ret_fd = write_fd;
483         }
484         else {
485                 ret_fd = cur_fd;
486         }
487         return(ret_fd);
488 }
489
490 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
491 {
492         struct Flow **flowpp;
493
494 #ifdef WALL
495         flowpp = 0;
496 #endif
497
498         if (prev) flowpp = *prev;
499
500         while (where) {
501                 if (where->sip.s_addr == what->sip.s_addr
502                                 && where->dip.s_addr == what->dip.s_addr
503                                 && where->proto == what->proto) {
504                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
505                                 case 0:
506                                         /* Both unfragmented */
507                                         if ((what->sp == where->sp)
508                                                         && (what->dp == where->dp)) goto done;
509                                         break;
510                                 case 2:
511                                         /* Both fragmented */
512                                         if (where->id == what->id) goto done;
513                                         break;
514                         }
515                 }
516                 flowpp = &where->next;
517                 where = where->next;
518         }
519 done:
520         if (prev) *prev = flowpp;
521         return where;
522 }
523
524         int put_into(struct Flow *flow, int flag
525 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
526                         , char *logbuf
527 #endif
528                     )
529 {
530         int ret = 0;
531         hash_t h;
532         struct Flow *flown, **flowpp;
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
534         char buf[64];
535 #endif
536
537         h = hash_flow(flow);
538 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
539         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
540         strcat(logbuf, buf);
541 #endif
542         pthread_mutex_lock(&flows_mutex[h]);
543         flowpp = &flows[h];
544         if (!(flown = find(flows[h], flow, &flowpp))) {
545                 /* No suitable flow found - add */
546                 if (flag == COPY_INTO) {
547                         if ((flown = mem_alloc())) {
548                                 copy_flow(flow, flown);
549                                 flow = flown;
550                         } else {
551 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
552                                 my_log(LOG_ERR, "%s %s. %s",
553                                                 "mem_alloc():", strerror(errno), "packet lost");
554 #endif
555                                 return -1;
556                         }
557                 }
558                 flow->next = flows[h];
559                 flows[h] = flow;
560 #if ((DEBUG) & DEBUG_I)
561                 flows_total++;
562                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
563 #endif
564 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
565                 if (flown) {
566                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
567                         strcat(logbuf, buf);
568                 }
569 #endif
570         } else {
571                 /* Found suitable flow - update */
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
573                 sprintf(buf, " +> %x", (unsigned) flown);
574                 strcat(logbuf, buf);
575 #endif
576                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
577                         flown->mtime = flow->mtime;
578                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
579                         flown->ctime = flow->ctime;
580                 flown->tcp_flags |= flow->tcp_flags;
581                 flown->size += flow->size;
582                 flown->pkts += flow->pkts;
583                 if (flow->flags & FLOW_FRAG) {
584                         /* Fragmented flow require some additional work */
585                         if (flow->flags & FLOW_TL) {
586                                 /*
587                                    ?FIXME?
588                                    Several packets with FLOW_TL (attack)
589                                    */
590                                 flown->sp = flow->sp;
591                                 flown->dp = flow->dp;
592                         }
593                         if (flow->flags & FLOW_LASTFRAG) {
594                                 /*
595                                    ?FIXME?
596                                    Several packets with FLOW_LASTFRAG (attack)
597                                    */
598                                 flown->sizeP = flow->sizeP;
599                         }
600                         flown->flags |= flow->flags;
601                         flown->sizeF += flow->sizeF;
602                         if ((flown->flags & FLOW_LASTFRAG)
603                                         && (flown->sizeF >= flown->sizeP)) {
604                                 /* All fragments received - flow reassembled */
605                                 *flowpp = flown->next;
606                                 pthread_mutex_unlock(&flows_mutex[h]);
607 #if ((DEBUG) & DEBUG_I)
608                                 flows_total--;
609                                 flows_fragmented--;
610 #endif
611                                 flown->id = 0;
612                                 flown->flags &= ~FLOW_FRAG;
613 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
614                                 strcat(logbuf," R");
615 #endif
616                                 ret = put_into(flown, MOVE_INTO
617 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
618                                                 , logbuf
619 #endif
620                                               );
621                         }
622                 }
623                 if (flag == MOVE_INTO) mem_free(flow);
624         }
625         pthread_mutex_unlock(&flows_mutex[h]);
626         return ret;
627 }
628
629 int onlyonce=0;
630
631 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
632 {
633         int i;
634
635         for (i = 0; i < fields; i++) {
636 #if ((DEBUG) & DEBUG_F)
637                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
638 #endif
639                 switch (format[i]) {
640                         case NETFLOW_IPV4_SRC_ADDR:
641                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
642                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
643                                 break;
644
645                         case NETFLOW_IPV4_DST_ADDR:
646                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
647                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
648                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
649                                 }
650                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
651                                 break;
652
653                         case NETFLOW_INPUT_SNMP:
654                                 *((uint16_t *) p) = htons(flow->iif);
655                                 p += NETFLOW_INPUT_SNMP_SIZE;
656                                 break;
657
658                         case NETFLOW_OUTPUT_SNMP:
659                                 *((uint16_t *) p) = htons(flow->oif);
660                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
661                                 break;
662
663                         case NETFLOW_PKTS_32:
664                                 *((uint32_t *) p) = htonl(flow->pkts);
665                                 p += NETFLOW_PKTS_32_SIZE;
666                                 break;
667
668                         case NETFLOW_BYTES_32:
669                                 *((uint32_t *) p) = htonl(flow->size);
670                                 p += NETFLOW_BYTES_32_SIZE;
671                                 break;
672
673                         case NETFLOW_FIRST_SWITCHED:
674                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
675                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
676                                 break;
677
678                         case NETFLOW_LAST_SWITCHED:
679                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
680                                 p += NETFLOW_LAST_SWITCHED_SIZE;
681                                 break;
682
683                         case NETFLOW_L4_SRC_PORT:
684                                 *((uint16_t *) p) = flow->sp;
685                                 p += NETFLOW_L4_SRC_PORT_SIZE;
686                                 break;
687
688                         case NETFLOW_L4_DST_PORT:
689                                 *((uint16_t *) p) = flow->dp;
690                                 p += NETFLOW_L4_DST_PORT_SIZE;
691                                 break;
692
693                         case NETFLOW_PROT:
694                                 *((uint8_t *) p) = flow->proto;
695                                 p += NETFLOW_PROT_SIZE;
696                                 break;
697
698                         case NETFLOW_SRC_TOS:
699                                 *((uint8_t *) p) = flow->tos;
700                                 p += NETFLOW_SRC_TOS_SIZE;
701                                 break;
702
703                         case NETFLOW_TCP_FLAGS:
704                                 *((uint8_t *) p) = flow->tcp_flags;
705                                 p += NETFLOW_TCP_FLAGS_SIZE;
706                                 break;
707
708                         case NETFLOW_VERSION:
709                                 *((uint16_t *) p) = htons(netflow->Version);
710                                 p += NETFLOW_VERSION_SIZE;
711                                 break;
712
713                         case NETFLOW_COUNT:
714                                 *((uint16_t *) p) = htons(emit_count);
715                                 p += NETFLOW_COUNT_SIZE;
716                                 break;
717
718                         case NETFLOW_UPTIME:
719                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
720                                 p += NETFLOW_UPTIME_SIZE;
721                                 break;
722
723                         case NETFLOW_UNIX_SECS:
724                                 *((uint32_t *) p) = htonl(emit_time.sec);
725                                 p += NETFLOW_UNIX_SECS_SIZE;
726                                 break;
727
728                         case NETFLOW_UNIX_NSECS:
729                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
730                                 p += NETFLOW_UNIX_NSECS_SIZE;
731                                 break;
732
733                         case NETFLOW_FLOW_SEQUENCE:
734                                 //*((uint32_t *) p) = htonl(emit_sequence);
735                                 *((uint32_t *) p) = 0;
736                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
737                                 break;
738
739                         case NETFLOW_PAD8:
740                                 /* Unsupported (uint8_t) */
741                         case NETFLOW_ENGINE_TYPE:
742                         case NETFLOW_ENGINE_ID:
743                         case NETFLOW_FLAGS7_1:
744                         case NETFLOW_SRC_MASK:
745                         case NETFLOW_DST_MASK:
746                                 if (onlyonce) {
747                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
748                                         onlyonce=1;
749                                 }
750                                 *((uint8_t *) p) = 0;
751                                 p += NETFLOW_PAD8_SIZE;
752                                 break;
753                         case NETFLOW_XID:
754                                 *((uint32_t *) p) = flow->xid;
755                                 p += NETFLOW_XID_SIZE;
756                                 break;
757                         case NETFLOW_PAD16:
758                                 /* Unsupported (uint16_t) */
759                         case NETFLOW_SRC_AS:
760                         case NETFLOW_DST_AS:
761                         case NETFLOW_FLAGS7_2:
762                                 *((uint16_t *) p) = 0;
763                                 p += NETFLOW_PAD16_SIZE;
764                                 break;
765
766                         case NETFLOW_PAD32:
767                                 /* Unsupported (uint32_t) */
768                         case NETFLOW_IPV4_NEXT_HOP:
769                         case NETFLOW_ROUTER_SC:
770                                 *((uint32_t *) p) = 0;
771                                 p += NETFLOW_PAD32_SIZE;
772                                 break;
773
774                         default:
775                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
776                                                 format, i, format[i]);
777                                 exit(1);
778                 }
779         }
780 #if ((DEBUG) & DEBUG_F)
781         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
782 #endif
783         return p;
784 }
785
786 void setuser() {
787         /*
788            Workaround for clone()-based threads
789            Try to change EUID independently of main thread
790            */
791         if (pw) {
792                 setgroups(0, NULL);
793                 setregid(pw->pw_gid, pw->pw_gid);
794                 setreuid(pw->pw_uid, pw->pw_uid);
795         }
796 }
797
798 void *emit_thread()
799 {
800         struct Flow *flow;
801         void *p;
802         struct timeval now;
803         struct timespec timeout;
804         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
805
806         p = (void *) &emit_packet + netflow->HeaderSize;
807         timeout.tv_nsec = 0;
808
809         setuser();
810
811         for (;;) {
812                 pthread_mutex_lock(&emit_mutex);
813                 while (!flows_emit) {
814                         gettimeofday(&now, 0);
815                         timeout.tv_sec = now.tv_sec + emit_timeout;
816                         /* Do not wait until emit_packet will filled - it may be too long */
817                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
818                                 pthread_mutex_unlock(&emit_mutex);
819                                 goto sendit;
820                         }
821                 }
822                 flow = flows_emit;
823                 flows_emit = flows_emit->next;
824 #if ((DEBUG) & DEBUG_I)
825                 emit_queue--;
826 #endif          
827                 pthread_mutex_unlock(&emit_mutex);
828
829 #ifdef UPTIME_TRICK
830                 if (!emit_count) {
831                         gettime(&start_time);
832                         start_time.sec -= start_time_offset;
833                 }
834 #endif
835                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
836                 mem_free(flow);
837                 emit_count++;
838 #ifdef PF2_DEBUG
839                 printf("Emit count = %d\n", emit_count);
840                 fflush(stdout);
841 #endif
842                 if (emit_count == netflow->MaxFlows) {
843 sendit:
844                         gettime(&emit_time);
845                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
846                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
847                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
848 #ifdef STD_NETFLOW_PDU
849                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
850 #endif
851                         peer_rot_cur = 0;
852                         for (i = 0; i < npeers; i++) {
853                                 if (peers[i].type == PEER_FILE) {
854                                         if (netflow->SeqOffset)
855                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
856                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
857                                         ret = write(peers[i].write_fd, emit_packet, size);
858                                         if (ret < size) {
859
860 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
861                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
862                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
863 #endif
864 #undef MESSAGES
865                                         }
866 #if ((DEBUG) & DEBUG_E)
867                                         commaneelse {
868                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
869                                                                 emit_count, i + 1, peers[i].seq);
870                                         }
871 #endif
872                                         peers[i].seq += emit_count;
873
874                                         /* Rate limit */
875                                         if (emit_rate_bytes) {
876                                                 sent += size;
877                                                 delay = sent / emit_rate_bytes;
878                                                 if (delay) {
879                                                         sent %= emit_rate_bytes;
880                                                         timeout.tv_sec = 0;
881                                                         timeout.tv_nsec = emit_rate_delay * delay;
882                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
883                                                 }
884                                         }
885                                 }
886                                 else
887                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
888                                         else
889                                                 if (peers[i].type == PEER_ROTATE) 
890                                                         if (peer_rot_cur++ == peer_rot_work) {
891 sendreal:
892                                                                 if (netflow->SeqOffset)
893                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
894                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
895                                                                 if (ret < size) {
896 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
897                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
898                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
899 #endif
900                                                                 }
901 #if ((DEBUG) & DEBUG_E)
902                                                                 commaneelse {
903                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
904                                                                                         emit_count, i + 1, peers[i].seq);
905                                                                 }
906 #endif
907                                                                 peers[i].seq += emit_count;
908
909                                                                 /* Rate limit */
910                                                                 if (emit_rate_bytes) {
911                                                                         sent += size;
912                                                                         delay = sent / emit_rate_bytes;
913                                                                         if (delay) {
914                                                                                 sent %= emit_rate_bytes;
915                                                                                 timeout.tv_sec = 0;
916                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
917                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
918                                                                         }
919                                                                 }
920                                                         }
921                         }
922                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
923                         emit_sequence += emit_count;
924                         emit_count = 0;
925 #if ((DEBUG) & DEBUG_I)
926                         emit_pkts++;
927 #endif
928                 }
929         }
930 }       
931
932 void *unpending_thread()
933 {
934         struct timeval now;
935         struct timespec timeout;
936 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
937         char logbuf[256];
938 #endif
939
940         setuser();
941
942         timeout.tv_nsec = 0;
943         pthread_mutex_lock(&unpending_mutex);
944
945         for (;;) {
946                 while (!(pending_tail->flags & FLOW_PENDING)) {
947                         gettimeofday(&now, 0);
948                         timeout.tv_sec = now.tv_sec + unpending_timeout;
949                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
950                 }
951
952 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
953                 *logbuf = 0;
954 #endif
955                 if (put_into(pending_tail, COPY_INTO
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
957                                         , logbuf
958 #endif
959                             ) < 0) {
960 #if ((DEBUG) & DEBUG_I)
961                         pkts_lost_unpending++;
962 #endif                          
963                 }
964
965 #if ((DEBUG) & DEBUG_U)
966                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
967 #endif
968
969                 pending_tail->flags = 0;
970                 pending_tail = pending_tail->next;
971 #if ((DEBUG) & DEBUG_I)
972                 pkts_pending_done++;
973 #endif
974         }
975 }
976
977 void *scan_thread()
978 {
979 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
980         char logbuf[256];
981 #endif
982         int i;
983         struct Flow *flow, **flowpp;
984         struct Time now;
985         struct timespec timeout;
986
987         setuser();
988
989         timeout.tv_nsec = 0;
990         pthread_mutex_lock(&scan_mutex);
991
992         for (;;) {
993                 gettime(&now);
994                 timeout.tv_sec = now.sec + scan_interval;
995                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
996
997                 gettime(&now);
998 #if ((DEBUG) & DEBUG_S)
999                 my_log(LOG_DEBUG, "S: %d", now.sec);
1000 #endif
1001                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1002                         pthread_mutex_lock(&flows_mutex[i]);
1003                         flow = flows[i];
1004                         flowpp = &flows[i];
1005                         while (flow) {
1006                                 if (flow->flags & FLOW_FRAG) {
1007                                         /* Process fragmented flow */
1008                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1009                                                 /* Fragmented flow expired - put it into special chain */
1010 #if ((DEBUG) & DEBUG_I)
1011                                                 flows_fragmented--;
1012                                                 flows_total--;
1013 #endif
1014                                                 *flowpp = flow->next;
1015                                                 flow->id = 0;
1016                                                 flow->flags &= ~FLOW_FRAG;
1017                                                 flow->next = scan_frag_dreg;
1018                                                 scan_frag_dreg = flow;
1019                                                 flow = *flowpp;
1020                                                 continue;
1021                                         }
1022                                 } else {
1023                                         /* Flow is not frgamented */
1024                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1025                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1026                                                 /* Flow expired */
1027 #if ((DEBUG) & DEBUG_S)
1028                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1029 #endif
1030 #if ((DEBUG) & DEBUG_I)
1031                                                 flows_total--;
1032 #endif
1033                                                 *flowpp = flow->next;
1034                                                 pthread_mutex_lock(&emit_mutex);
1035                                                 flow->next = flows_emit;
1036                                                 flows_emit = flow;
1037 #if ((DEBUG) & DEBUG_I)
1038                                                 emit_queue++;
1039 #endif                          
1040                                                 pthread_mutex_unlock(&emit_mutex);
1041                                                 flow = *flowpp;
1042                                                 continue;
1043                                         }
1044                                 }
1045                                 flowpp = &flow->next;
1046                                 flow = flow->next;
1047                         } /* chain loop */
1048                         pthread_mutex_unlock(&flows_mutex[i]);
1049                 } /* hash loop */
1050                 if (flows_emit) pthread_cond_signal(&emit_cond);
1051
1052                 while (scan_frag_dreg) {
1053                         flow = scan_frag_dreg;
1054                         scan_frag_dreg = flow->next;
1055 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1056                         *logbuf = 0;
1057 #endif
1058                         put_into(flow, MOVE_INTO
1059 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1060                                         , logbuf
1061 #endif
1062                                 );
1063 #if ((DEBUG) & DEBUG_S)
1064                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1065 #endif
1066                 }
1067         }
1068 }
1069
1070 void *cap_thread()
1071 {
1072         struct ulog_packet_msg *ulog_msg;
1073         struct ip *nl;
1074         void *tl;
1075         struct Flow *flow;
1076         int len, off_frag, psize;
1077 #if ((DEBUG) & DEBUG_C)
1078         char buf[64];
1079         char logbuf[256];
1080 #endif
1081
1082         setuser();
1083
1084         while (!killed) {
1085                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1086                 if (len <= 0) {
1087                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1088                         continue;
1089                 }
1090                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1091
1092 #if ((DEBUG) & DEBUG_C)
1093                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1094 #endif
1095
1096                         nl = (void *) &ulog_msg->payload;
1097                         psize = ulog_msg->data_len;
1098
1099                         /* Sanity check */
1100                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1101 #if ((DEBUG) & DEBUG_C)
1102                                 strcat(logbuf, " U");
1103                                 my_log(LOG_DEBUG, "%s", logbuf);
1104 #endif
1105 #if ((DEBUG) & DEBUG_I)
1106                                 pkts_ignored++;
1107 #endif
1108                                 continue;
1109                         }
1110
1111                         if (pending_head->flags) {
1112 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1113                                 my_log(LOG_ERR,
1114 # if ((DEBUG) & DEBUG_C)
1115                                                 "%s %s %s", logbuf,
1116 # else
1117                                                 "%s %s",
1118 # endif
1119                                                 "pending queue full:", "packet lost");
1120 #endif
1121 #if ((DEBUG) & DEBUG_I)
1122                                 pkts_lost_capture++;
1123 #endif
1124                                 goto done;
1125                         }
1126
1127 #if ((DEBUG) & DEBUG_I)
1128                         pkts_total++;
1129 #endif
1130
1131                         flow = pending_head;
1132
1133                         /* ?FIXME? Add sanity check for ip_len? */
1134                         flow->size = ntohs(nl->ip_len);
1135 #if ((DEBUG) & DEBUG_I)
1136                         size_total += flow->size;
1137 #endif
1138
1139                         flow->sip = nl->ip_src;
1140                         flow->dip = nl->ip_dst;
1141                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1142                         /* It's going to be expensive calling this syscall on every flow.
1143                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1144                         flow->xid = get_vhi_name(ulog_msg->mark);
1145                         if (flow->xid == -1 || flow->xid == 0)
1146                                 flow->xid = ulog_msg->mark;
1147
1148                         if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1149                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
1150                         }
1151                         flow->iif = snmp_index(ulog_msg->indev_name);
1152                         flow->oif = snmp_index(ulog_msg->outdev_name);
1153                         flow->proto = nl->ip_p;
1154                         flow->id = 0;
1155                         flow->tcp_flags = 0;
1156                         flow->pkts = 1;
1157                         flow->sizeF = 0;
1158                         flow->sizeP = 0;
1159                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1160                         if (ulog_msg->timestamp_sec) {
1161                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1162                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1163                         } else gettime(&flow->ctime);
1164                         flow->mtime = flow->ctime;
1165
1166                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1167
1168                         /*
1169                            Offset (from network layer) to transport layer header/IP data
1170                            IOW IP header size ;-)
1171
1172                            ?FIXME?
1173                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1174                            */
1175                         off_tl = nl->ip_hl << 2;
1176                         tl = (void *) nl + off_tl;
1177
1178                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1179                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1180                         psize -= off_tl;
1181                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1182                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1183
1184                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1185                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1186 #if ((DEBUG) & DEBUG_C)
1187                                 strcat(logbuf, " F");
1188 #endif
1189 #if ((DEBUG) & DEBUG_I)
1190                                 pkts_total_fragmented++;
1191 #endif
1192                                 flow->flags |= FLOW_FRAG;
1193                                 flow->id = nl->ip_id;
1194
1195                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1196                                         /* Packet whith IP_MF contains information about whole datagram size */
1197                                         flow->flags |= FLOW_LASTFRAG;
1198                                         /* size = frag_offset*8 + data_size */
1199                                         flow->sizeP = off_frag + flow->sizeF;
1200                                 }
1201                         }
1202
1203 #if ((DEBUG) & DEBUG_C)
1204                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1205                         strcat(logbuf, buf);
1206                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1207                         strcat(logbuf, buf);
1208 #endif
1209
1210                         /*
1211                            Fortunately most interesting transport layer information fit
1212                            into first 8 bytes of IP data field (minimal nonzero size).
1213                            Thus we don't need actual packet reassembling to build whole
1214                            transport layer data. We only check the fragment offset for
1215                            zero value to find packet with this information.
1216                            */
1217                         if (!off_frag && psize >= 8) {
1218                                 switch (flow->proto) {
1219                                         case IPPROTO_TCP:
1220                                         case IPPROTO_UDP:
1221                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1222                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1223                                                 goto tl_known;
1224
1225 #ifdef ICMP_TRICK
1226                                         case IPPROTO_ICMP:
1227                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1228                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1229                                                 goto tl_known;
1230 #endif
1231 #ifdef ICMP_TRICK_CISCO
1232                                         case IPPROTO_ICMP:
1233                                                 flow->dp = *((int32_t *) tl);
1234                                                 goto tl_known;
1235 #endif
1236
1237                                         default:
1238                                                 /* Unknown transport layer */
1239 #if ((DEBUG) & DEBUG_C)
1240                                                 strcat(logbuf, " U");
1241 #endif
1242                                                 flow->sp = 0;
1243                                                 flow->dp = 0;
1244                                                 break;
1245
1246 tl_known:
1247 #if ((DEBUG) & DEBUG_C)
1248                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1249                                                 strcat(logbuf, buf);
1250 #endif
1251                                                 flow->flags |= FLOW_TL;
1252                                 }
1253                         }
1254
1255                         /* Check for tcp flags presence (including CWR and ECE). */
1256                         if (flow->proto == IPPROTO_TCP
1257                                         && off_frag < 16
1258                                         && psize >= 16 - off_frag) {
1259                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1260 #if ((DEBUG) & DEBUG_C)
1261                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1262                                 strcat(logbuf, buf);
1263 #endif
1264                         }
1265
1266 #if ((DEBUG) & DEBUG_C)
1267                         sprintf(buf, " => %x", (unsigned) flow);
1268                         strcat(logbuf, buf);
1269                         my_log(LOG_DEBUG, "%s", logbuf);
1270 #endif
1271
1272 #if ((DEBUG) & DEBUG_I)
1273                         pkts_pending++;
1274                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1275                         if (pending_queue_trace < pending_queue_trace_candidate)
1276                                 pending_queue_trace = pending_queue_trace_candidate;
1277 #endif
1278
1279                         /* Flow complete - inform unpending_thread() about it */
1280                         pending_head->flags |= FLOW_PENDING;
1281                         pending_head = pending_head->next;
1282 done:
1283                         pthread_cond_signal(&unpending_cond);
1284                 }
1285         }
1286         return 0;
1287 }
1288
1289 /* Copied out of CoDemux */
1290
1291 static int init_daemon() {
1292         pid_t pid;
1293         FILE *pidfile;
1294
1295         pidfile = fopen(PIDFILE, "w");
1296         if (pidfile == NULL) {
1297                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1298         }
1299
1300         if ((pid = fork()) < 0) {
1301                 fclose(pidfile);
1302                 my_log(LOG_ERR, "Could not fork!\n");
1303                 return(-1);
1304         }
1305         else if (pid != 0) {
1306                 /* i'm the parent, writing down the child pid  */
1307                 fprintf(pidfile, "%u\n", pid);
1308                 fclose(pidfile);
1309                 exit(0);
1310         }
1311
1312         /* close the pid file */
1313         fclose(pidfile);
1314
1315         /* routines for any daemon process
1316            1. create a new session 
1317            2. change directory to the root
1318            3. change the file creation permission 
1319            */
1320         setsid();
1321         chdir("/var/local/fprobe");
1322         umask(0);
1323
1324         return(0);
1325 }
1326
1327 int main(int argc, char **argv)
1328 {
1329         char errpbuf[512];
1330         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1331         int c, i, write_fd, memory_limit = 0;
1332         struct addrinfo hints, *res;
1333         struct sockaddr_in saddr;
1334         pthread_attr_t tattr;
1335         struct sigaction sigact;
1336         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1337         struct timeval timeout;
1338
1339         sched_min = sched_get_priority_min(SCHED);
1340         sched_max = sched_get_priority_max(SCHED);
1341
1342         memset(&saddr, 0 , sizeof(saddr));
1343         memset(&hints, 0 , sizeof(hints));
1344         hints.ai_flags = AI_PASSIVE;
1345         hints.ai_family = AF_INET;
1346         hints.ai_socktype = SOCK_DGRAM;
1347
1348         /* Process command line options */
1349
1350         opterr = 0;
1351         while ((c = my_getopt(argc, argv, parms)) != -1) {
1352                 switch (c) {
1353                         case '?':
1354                                 usage();
1355
1356                         case 'h':
1357                                 usage();
1358                 }
1359         }
1360
1361         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1362         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1363         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1364         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1365         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1366         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1367         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1368         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1369         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1370         if (parms[nflag].count) {
1371                 switch (atoi(parms[nflag].arg)) {
1372                         case 1:
1373                                 netflow = &NetFlow1;
1374                                 break;
1375
1376                         case 5:
1377                                 break;
1378
1379                         case 7:
1380                                 netflow = &NetFlow7;
1381                                 break;
1382
1383                         default:
1384                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1385                                 exit(1);
1386                 }
1387         }
1388         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1389         if (parms[lflag].count) {
1390                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1391                         *log_suffix++ = 0;
1392                         if (*log_suffix) {
1393                                 sprintf(errpbuf, "[%s]", log_suffix);
1394                                 strcat(ident, errpbuf);
1395                         }
1396                 }
1397                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1398                 if (log_suffix) *--log_suffix = ':';
1399         }
1400         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1401 err_malloc:
1402                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1403                 exit(1);
1404         }
1405         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1406         if (parms[qflag].count) {
1407                 pending_queue_length = atoi(parms[qflag].arg);
1408                 if (pending_queue_length < 1) {
1409                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1410                         exit(1);
1411                 }
1412         }
1413         if (parms[rflag].count) {
1414                 schedp.sched_priority = atoi(parms[rflag].arg);
1415                 if (schedp.sched_priority
1416                                 && (schedp.sched_priority < sched_min
1417                                         || schedp.sched_priority > sched_max)) {
1418                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1419                         exit(1);
1420                 }
1421         }
1422         if (parms[Bflag].count) {
1423                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1424         }
1425         if (parms[bflag].count) {
1426                 bulk_quantity = atoi(parms[bflag].arg);
1427                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1428                         fprintf(stderr, "Illegal %s\n", "bulk size");
1429                         exit(1);
1430                 }
1431         }
1432         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1433         if (parms[Xflag].count) {
1434                 for(i = 0; parms[Xflag].arg[i]; i++)
1435                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1436                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1437                         goto err_malloc;
1438                 rule = strtok(parms[Xflag].arg, ":");
1439                 for (i = 0; rule; i++) {
1440                         snmp_rules[i].len = strlen(rule);
1441                         if (snmp_rules[i].len > IFNAMSIZ) {
1442                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1443                                 exit(1);
1444                         }
1445                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1446                         if (!*(rule - 1)) *(rule - 1) = ',';
1447                         rule = strtok(NULL, ",");
1448                         if (!rule) {
1449                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1450                                 exit(1);
1451                         }
1452                         snmp_rules[i].base = atoi(rule);
1453                         *(rule - 1) = ':';
1454                         rule = strtok(NULL, ":");
1455                 }
1456                 nsnmp_rules = i;
1457         }
1458         if (parms[tflag].count)
1459                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1460         if (parms[aflag].count) {
1461                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1462 bad_lhost:
1463                         fprintf(stderr, "Illegal %s\n", "source address");
1464                         exit(1);
1465                 } else {
1466                         saddr = *((struct sockaddr_in *) res->ai_addr);
1467                         freeaddrinfo(res);
1468                 }
1469         }
1470         if (parms[uflag].count) 
1471                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1472                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1473                         exit(1);
1474                 }
1475
1476
1477         /* Process collectors parameters. Brrrr... :-[ */
1478
1479         npeers = argc - optind;
1480         if (npeers > 1) {
1481                 /* Send to remote Netflow collector */
1482                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1483                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1484                         dhost = argv[i];
1485                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1486                         *dport++ = 0;
1487                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1488                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1489                                 exit(1);
1490                         }
1491                         peers[npeers].write_fd = write_fd;
1492                         peers[npeers].type = PEER_MIRROR;
1493                         peers[npeers].laddr = saddr;
1494                         peers[npeers].seq = 0;
1495                         if ((lhost = strchr(dport, '/'))) {
1496                                 *lhost++ = 0;
1497                                 if ((type = strchr(lhost, '/'))) {
1498                                         *type++ = 0;
1499                                         switch (*type) {
1500                                                 case 0:
1501                                                 case 'm':
1502                                                         break;
1503
1504                                                 case 'r':
1505                                                         peers[npeers].type = PEER_ROTATE;
1506                                                         npeers_rot++;
1507                                                         break;
1508
1509                                                 default:
1510                                                         goto bad_collector;
1511                                         }
1512                                 }
1513                                 if (*lhost) {
1514                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1515                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1516                                         freeaddrinfo(res);
1517                                 }
1518                         }
1519                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1520                                                 sizeof(struct sockaddr_in))) {
1521                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1522                                 exit(1);
1523                         }
1524                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1525 bad_collector:
1526                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1527                                 exit(1);
1528                         }
1529                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1530                         freeaddrinfo(res);
1531                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1532                                                 sizeof(struct sockaddr_in))) {
1533                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1534                                 exit(1);
1535                         }
1536
1537                         /* Restore command line */
1538                         if (type) *--type = '/';
1539                         if (lhost) *--lhost = '/';
1540                         *--dport = ':';
1541                 }
1542         }
1543         else if (parms[fflag].count) {
1544                 // log into a file
1545                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1546                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1547                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1548
1549                 peers[npeers].write_fd = START_DATA_FD;
1550                 peers[npeers].type = PEER_FILE;
1551                 peers[npeers].seq = 0;
1552
1553                 read_cur_epoch();
1554                 npeers++;
1555         }
1556         else 
1557                 usage();
1558
1559
1560         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1561         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1562         if (!ulog_handle) {
1563                 fprintf(stderr, "libipulog initialization error: %s",
1564                                 ipulog_strerror(ipulog_errno));
1565                 exit(1);
1566         }
1567         if (sockbufsize)
1568                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1569                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1570                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1571
1572         /* Daemonize (if log destination stdout-free) */
1573
1574         my_log_open(ident, verbosity, log_dest);
1575
1576         init_daemon();
1577
1578         if (!(log_dest & 2)) {
1579                 /* Crash-proofing - Sapan*/
1580                 while (1) {
1581                         int pid=fork();
1582                         if (pid==-1) {
1583                                 fprintf(stderr, "fork(): %s", strerror(errno));
1584                                 exit(1);
1585                         }
1586                         else if (pid==0) {
1587                                 setsid();
1588                                 freopen("/dev/null", "r", stdin);
1589                                 freopen("/dev/null", "w", stdout);
1590                                 freopen("/dev/null", "w", stderr);
1591                                 break;
1592                         }
1593                         else {
1594                                 while (wait3(NULL,0,NULL) < 1);
1595                         }
1596                 }
1597         } else {
1598                 setvbuf(stdout, (char *)0, _IONBF, 0);
1599                 setvbuf(stderr, (char *)0, _IONBF, 0);
1600         }
1601
1602         pid = getpid();
1603         sprintf(errpbuf, "[%ld]", (long) pid);
1604         strcat(ident, errpbuf);
1605
1606         /* Initialization */
1607
1608         hash_init(); /* Actually for crc16 only */
1609         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1610         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1611
1612 #ifdef UPTIME_TRICK
1613         /* Hope 12 days is enough :-/ */
1614         start_time_offset = 1 << 20;
1615
1616         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1617 #endif
1618         gettime(&start_time);
1619
1620         /*
1621            Build static pending queue as circular buffer.
1622            */
1623         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1624         pending_tail = pending_head;
1625         for (i = pending_queue_length - 1; i--;) {
1626                 if (!(pending_tail->next = mem_alloc())) {
1627 err_mem_alloc:
1628                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1629                         exit(1);
1630                 }
1631                 pending_tail = pending_tail->next;
1632         }
1633         pending_tail->next = pending_head;
1634         pending_tail = pending_head;
1635
1636         sigemptyset(&sig_mask);
1637         sigact.sa_handler = &sighandler;
1638         sigact.sa_mask = sig_mask;
1639         sigact.sa_flags = 0;
1640         sigaddset(&sig_mask, SIGTERM);
1641         sigaction(SIGTERM, &sigact, 0);
1642 #if ((DEBUG) & DEBUG_I)
1643         sigaddset(&sig_mask, SIGUSR1);
1644         sigaction(SIGUSR1, &sigact, 0);
1645 #endif
1646         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1647                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1648                 exit(1);
1649         }
1650
1651         my_log(LOG_INFO, "Starting %s...", VERSION);
1652
1653         if (parms[cflag].count) {
1654                 if (chdir(parms[cflag].arg) || chroot(".")) {
1655                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1656                         exit(1);
1657                 }
1658         }
1659
1660         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1661         pthread_attr_init(&tattr);
1662         for (i = 0; i < THREADS - 1; i++) {
1663                 if (schedp.sched_priority > 0) {
1664                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1665                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1666                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1667                                 exit(1);
1668                         }
1669                 }
1670                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1671                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1672                         exit(1);
1673                 }
1674                 pthread_detach(thid);
1675                 schedp.sched_priority++;
1676         }
1677
1678         if (pw) {
1679                 if (setgroups(0, NULL)) {
1680                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1681                         exit(1);
1682                 }
1683                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1684                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1685                         exit(1);
1686                 }
1687                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1688                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1689                         exit(1);
1690                 }
1691         }
1692
1693         if (!(pidfile = fopen(pidfilepath, "w")))
1694                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1695         else {
1696                 fprintf(pidfile, "%ld\n", (long) pid);
1697                 fclose(pidfile);
1698         }
1699
1700         my_log(LOG_INFO, "pid: %d", pid);
1701         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1702                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1703                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1704                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1705                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1706                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1707                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1708         for (i = 0; i < nsnmp_rules; i++) {
1709                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1710                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1711         }
1712         for (i = 0; i < npeers; i++) {
1713                 switch (peers[i].type) {
1714                         case PEER_MIRROR:
1715                                 c = 'm';
1716                                 break;
1717                         case PEER_ROTATE:
1718                                 c = 'r';
1719                                 break;
1720                 }
1721                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1722                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1723                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1724         }
1725
1726         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1727
1728         timeout.tv_usec = 0;
1729         while (!killed
1730                         || (total_elements - free_elements - pending_queue_length)
1731                         || emit_count
1732                         || pending_tail->flags) {
1733
1734                 if (!sigs) {
1735                         timeout.tv_sec = scan_interval;
1736                         select(0, 0, 0, 0, &timeout);
1737                 }
1738
1739                 if (sigs & SIGTERM_MASK && !killed) {
1740                         sigs &= ~SIGTERM_MASK;
1741                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1742                         scan_interval = 1;
1743                         frag_lifetime = -1;
1744                         active_lifetime = -1;
1745                         inactive_lifetime = -1;
1746                         emit_timeout = 1;
1747                         unpending_timeout = 1;
1748                         killed = 1;
1749                         pthread_cond_signal(&scan_cond);
1750                         pthread_cond_signal(&unpending_cond);
1751                 }
1752
1753 #if ((DEBUG) & DEBUG_I)
1754                 if (sigs & SIGUSR1_MASK) {
1755                         sigs &= ~SIGUSR1_MASK;
1756                         info_debug();
1757                 }
1758 #endif
1759         }
1760         remove(pidfilepath);
1761 #if ((DEBUG) & DEBUG_I)
1762         info_debug();
1763 #endif
1764         my_log(LOG_INFO, "Done.");
1765 #ifdef WALL
1766         return 0;
1767 #endif
1768 }