Fix memory bugs.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, slice_id support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 /* #include "vserver.h" */
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220 static char cur_output_file[MAX_PATH_LEN];
221
222 static struct Flow *flows[1 << HASH_BITS];
223 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
224
225 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
227
228 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *pending_head, *pending_tail;
231 static struct Flow *scan_frag_dreg;
232
233 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
235 static struct Flow *flows_emit;
236
237 static char ident[256] = "fprobe-ulog";
238 static FILE *pidfile;
239 static char *pidfilepath;
240 static pid_t pid;
241 static int killed;
242 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
243 static struct ipulog_handle *ulog_handle;
244 static uint32_t ulog_gmask = 1;
245 static char *cap_buf;
246 static int nsnmp_rules;
247 static struct snmp_rule *snmp_rules;
248 static struct passwd *pw = 0;
249
250 void usage()
251 {
252         fprintf(stdout,
253                         "fprobe-ulog: a NetFlow probe. Version %s\n"
254                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
255                         "\n"
256                         "-h\t\tDisplay this help\n"
257                         "-U <mask>\tULOG group bitwise mask [1]\n"
258                         "-s <seconds>\tHow often scan for expired flows [5]\n"
259                         "-g <seconds>\tFragmented flow lifetime [30]\n"
260                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
261                         "-f <filename>\tLog flow data in a file\n"
262                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
263                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
264                         "-a <address>\tUse <address> as source for NetFlow flow\n"
265                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
266                         "-M\t\tUse netfilter mark value as ToS flag\n"
267                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
268                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
269                         "-q <flows>\tPending queue length [100]\n"
270                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
271                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
272                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
273                         "-c <directory>\tDirectory to chroot to\n"
274                         "-u <user>\tUser to run as\n"
275                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
276                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
277                         "-y <remote:port>\tAddress of the NetFlow collector\n"
278                         "-f <writable file>\tFile to write data into\n"
279                         "-T <n>\tRotate log file every n epochs\n"
280                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
281                         "-E <[1..60]>\tSize of an epoch in minutes\n"
282                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
283                         ,
284                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
285         exit(0);
286 }
287
288 #if ((DEBUG) & DEBUG_I)
289 void info_debug()
290 {
291         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
292                         pkts_total, pkts_total_fragmented, size_total,
293                         pkts_pending - pkts_pending_done, pending_queue_trace);
294         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
295                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
296         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
297                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
298         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
299                         total_elements, free_elements, total_memory);
300 }
301 #endif
302
303 void sighandler(int sig)
304 {
305         switch (sig) {
306                 case SIGTERM:
307                         sigs |= SIGTERM_MASK;
308                         break;
309 #if ((DEBUG) & DEBUG_I)
310                 case SIGUSR1:
311                         sigs |= SIGUSR1_MASK;
312                         break;
313 #endif
314         }
315 }
316
317 void gettime(struct Time *now)
318 {
319         struct timeval t;
320
321         gettimeofday(&t, 0);
322         now->sec = t.tv_sec;
323         now->usec = t.tv_usec;
324 }
325
326
327 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
328 {
329         return (t1->sec - t2->sec)/60;
330 }
331
332 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
333 {
334         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
335 }
336
337 /* Uptime in miliseconds */
338 uint32_t getuptime(struct Time *t)
339 {
340         /* Maximum uptime is about 49/2 days */
341         return cmpmtime(t, &start_time);
342 }
343
344 /* Uptime in minutes */
345 uint32_t getuptime_minutes(struct Time *t)
346 {
347         /* Maximum uptime is about 49/2 days */
348         return cmpMtime(t, &start_time);
349 }
350
351 hash_t hash_flow(struct Flow *flow)
352 {
353         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
354         else return hash(flow, sizeof(struct Flow_TL));
355 }
356
357 uint16_t snmp_index(char *name) {
358         uint32_t i;
359
360         if (!*name) return 0;
361
362         for (i = 0; (int) i < nsnmp_rules; i++) {
363                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
364                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
365         }
366
367         if ((i = if_nametoindex(name))) return i;
368
369         return -1;
370 }
371
372 inline void copy_flow(struct Flow *src, struct Flow *dst)
373 {
374         dst->iif = src->iif;
375         dst->oif = src->oif;
376         dst->sip = src->sip;
377         dst->dip = src->dip;
378         dst->tos = src->tos;
379         dst->slice_id = src->slice_id;
380         dst->proto = src->proto;
381         dst->tcp_flags = src->tcp_flags;
382         dst->id = src->id;
383         dst->sp = src->sp;
384         dst->dp = src->dp;
385         dst->pkts = src->pkts;
386         dst->size = src->size;
387         dst->sizeF = src->sizeF;
388         dst->sizeP = src->sizeP;
389         dst->ctime = src->ctime;
390         dst->mtime = src->mtime;
391         dst->flags = src->flags;
392 }
393
394 void read_cur_epoch() {
395         int fd;
396         /* Reset to -1 in case the read fails */
397         cur_epoch=-1;
398         fd = open(LAST_EPOCH_FILE, O_RDONLY);
399         if (fd != -1) {
400                 char snum[MAX_EPOCH_SIZE];
401                 ssize_t len;
402                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
403                 if (len != -1) {
404                         snum[len]='\0';
405                         sscanf(snum,"%d",&cur_epoch);
406                         cur_epoch++; /* Let's not stone the last epoch */
407                         close(fd);
408                 }
409         }
410         return;
411 }
412
413
414 /* Dumps the current epoch in a file to cope with
415  * reboots and killings of fprobe */
416
417 void update_cur_epoch_file(int n) {
418         int fd, len;
419         char snum[MAX_EPOCH_SIZE];
420         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
421         fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
422         if (fd == -1) {
423                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
424                 return;
425         }
426         write(fd, snum, len);
427         close(fd);
428 }
429
430 /* Get the file descriptor corresponding to the current file.
431  * The kludgy implementation is to abstract away the 'current
432  * file descriptor', which may also be a socket. 
433  */
434
435 unsigned get_data_file_fd(char *fname, int cur_fd) {
436         struct Time now;
437         unsigned cur_uptime;
438
439         struct statfs statfs;
440         int ret_fd;
441
442         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
443          * doesn't solve the problem */
444         gettime(&now);
445         cur_uptime = getuptime_minutes(&now);
446
447         if (cur_fd != START_DATA_FD) {
448                 if (fstatfs(cur_fd, &statfs) == -1) {
449                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
450                 }
451                 else {
452                         if (min_free && (statfs.f_bavail < min_free)
453                                         && (cur_epoch==last_peak))
454                         {
455                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
456                                 cur_epoch = -1;
457                         }
458                         /*
459                            else 
460                            assume that we can reclaim space by overwriting our own files
461                            and that the difference in size will not fill the disk - sapan
462                            */
463                 }
464         }
465
466         /* If epoch length has been exceeded, 
467          * or we're starting up 
468          * or we're going back to the first epoch */
469         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0) {
475                         close(cur_fd);
476             /* Compress the finished file */
477             char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; 
478             snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
479             system(gzip_cmd);
480         }
481                 snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
482                 if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)) < 0) {
483                         my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
484                         exit(1);
485                 }
486                 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
487                         my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
488                 }
489                 update_cur_epoch_file(cur_epoch);
490                 ret_fd = write_fd;
491         }
492         else {
493                 ret_fd = cur_fd;
494         }
495         return(ret_fd);
496 }
497
498 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
499 {
500         struct Flow **flowpp;
501
502 #ifdef WALL
503         flowpp = 0;
504 #endif
505
506         if (prev) flowpp = *prev;
507
508         while (where) {
509                 if (where->sip.s_addr == what->sip.s_addr
510                                 && where->dip.s_addr == what->dip.s_addr
511                                 && where->proto == what->proto) {
512                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
513                                 case 0:
514                                         /* Both unfragmented */
515                                         if ((what->sp == where->sp)
516                                                         && (what->dp == where->dp)) goto done;
517                                         break;
518                                 case 2:
519                                         /* Both fragmented */
520                                         if (where->id == what->id) goto done;
521                                         break;
522                         }
523                 }
524                 flowpp = &where->next;
525                 where = where->next;
526         }
527 done:
528         if (prev) *prev = flowpp;
529         return where;
530 }
531
532         int put_into(struct Flow *flow, int flag
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
534                         , char *logbuf
535 #endif
536                     )
537 {
538         int ret = 0;
539         hash_t h;
540         struct Flow *flown, **flowpp;
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542         char buf[64];
543 #endif
544
545         h = hash_flow(flow);
546 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
547         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
548         strcat(logbuf, buf);
549 #endif
550         pthread_mutex_lock(&flows_mutex[h]);
551         flowpp = &flows[h];
552         if (!(flown = find(flows[h], flow, &flowpp))) {
553                 /* No suitable flow found - add */
554                 if (flag == COPY_INTO) {
555                         if ((flown = mem_alloc())) {
556                                 copy_flow(flow, flown);
557                                 flow = flown;
558                         } else {
559 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
560                                 my_log(LOG_ERR, "%s %s. %s",
561                                                 "mem_alloc():", strerror(errno), "packet lost");
562 #endif
563                                 return -1;
564                         }
565                 }
566                 flow->next = flows[h];
567                 flows[h] = flow;
568 #if ((DEBUG) & DEBUG_I)
569                 flows_total++;
570                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
571 #endif
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
573                 if (flown) {
574                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
575                         strcat(logbuf, buf);
576                 }
577 #endif
578         } else {
579                 /* Found suitable flow - update */
580 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
581                 sprintf(buf, " +> %x", (unsigned) flown);
582                 strcat(logbuf, buf);
583 #endif
584                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
585                         flown->mtime = flow->mtime;
586                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
587                         flown->ctime = flow->ctime;
588                 flown->tcp_flags |= flow->tcp_flags;
589                 flown->size += flow->size;
590                 flown->pkts += flow->pkts;
591
592                 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
593                  * if a better value comes along. A good example of this is that by the time CoDemux sets the
594                  * peercred of a flow, it has already been accounted for here and attributed to root. */
595
596                 if (flown->slice_id<1)
597                                 flown->slice_id = flow->slice_id;
598
599
600                 if (flow->flags & FLOW_FRAG) {
601                         /* Fragmented flow require some additional work */
602                         if (flow->flags & FLOW_TL) {
603                                 /*
604                                    ?FIXME?
605                                    Several packets with FLOW_TL (attack)
606                                    */
607                                 flown->sp = flow->sp;
608                                 flown->dp = flow->dp;
609                         }
610                         if (flow->flags & FLOW_LASTFRAG) {
611                                 /*
612                                    ?FIXME?
613                                    Several packets with FLOW_LASTFRAG (attack)
614                                    */
615                                 flown->sizeP = flow->sizeP;
616                         }
617                         flown->flags |= flow->flags;
618                         flown->sizeF += flow->sizeF;
619                         if ((flown->flags & FLOW_LASTFRAG)
620                                         && (flown->sizeF >= flown->sizeP)) {
621                                 /* All fragments received - flow reassembled */
622                                 *flowpp = flown->next;
623                                 pthread_mutex_unlock(&flows_mutex[h]);
624 #if ((DEBUG) & DEBUG_I)
625                                 flows_total--;
626                                 flows_fragmented--;
627 #endif
628                                 flown->id = 0;
629                                 flown->flags &= ~FLOW_FRAG;
630 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
631                                 strcat(logbuf," R");
632 #endif
633                                 ret = put_into(flown, MOVE_INTO
634 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
635                                                 , logbuf
636 #endif
637                                               );
638                         }
639                 }
640                 if (flag == MOVE_INTO) mem_free(flow);
641         }
642         pthread_mutex_unlock(&flows_mutex[h]);
643         return ret;
644 }
645
646 int onlyonce=0;
647
648 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
649 {
650         int i;
651
652         for (i = 0; i < fields; i++) {
653 #if ((DEBUG) & DEBUG_F)
654                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
655 #endif
656                 switch (format[i]) {
657                         case NETFLOW_IPV4_SRC_ADDR:
658                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
659                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
660                                 break;
661
662                         case NETFLOW_IPV4_DST_ADDR:
663                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
664                                 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
665                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
666                                 }
667                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
668                                 break;
669
670                         case NETFLOW_INPUT_SNMP:
671                                 *((uint16_t *) p) = htons(flow->iif);
672                                 p += NETFLOW_INPUT_SNMP_SIZE;
673                                 break;
674
675                         case NETFLOW_OUTPUT_SNMP:
676                                 *((uint16_t *) p) = htons(flow->oif);
677                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
678                                 break;
679
680                         case NETFLOW_PKTS_32:
681                                 *((uint32_t *) p) = htonl(flow->pkts);
682                                 p += NETFLOW_PKTS_32_SIZE;
683                                 break;
684
685                         case NETFLOW_BYTES_32:
686                                 *((uint32_t *) p) = htonl(flow->size);
687                                 p += NETFLOW_BYTES_32_SIZE;
688                                 break;
689
690                         case NETFLOW_FIRST_SWITCHED:
691                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
692                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
693                                 break;
694
695                         case NETFLOW_LAST_SWITCHED:
696                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
697                                 p += NETFLOW_LAST_SWITCHED_SIZE;
698                                 break;
699
700                         case NETFLOW_L4_SRC_PORT:
701                                 *((uint16_t *) p) = flow->sp;
702                                 p += NETFLOW_L4_SRC_PORT_SIZE;
703                                 break;
704
705                         case NETFLOW_L4_DST_PORT:
706                                 *((uint16_t *) p) = flow->dp;
707                                 p += NETFLOW_L4_DST_PORT_SIZE;
708                                 break;
709
710                         case NETFLOW_PROT:
711                                 *((uint8_t *) p) = flow->proto;
712                                 p += NETFLOW_PROT_SIZE;
713                                 break;
714
715                         case NETFLOW_SRC_TOS:
716                                 *((uint8_t *) p) = flow->tos;
717                                 p += NETFLOW_SRC_TOS_SIZE;
718                                 break;
719
720                         case NETFLOW_TCP_FLAGS:
721                                 *((uint8_t *) p) = flow->tcp_flags;
722                                 p += NETFLOW_TCP_FLAGS_SIZE;
723                                 break;
724
725                         case NETFLOW_VERSION:
726                                 *((uint16_t *) p) = htons(netflow->Version);
727                                 p += NETFLOW_VERSION_SIZE;
728                                 break;
729
730                         case NETFLOW_COUNT:
731                                 *((uint16_t *) p) = htons(emit_count);
732                                 p += NETFLOW_COUNT_SIZE;
733                                 break;
734
735                         case NETFLOW_UPTIME:
736                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
737                                 p += NETFLOW_UPTIME_SIZE;
738                                 break;
739
740                         case NETFLOW_UNIX_SECS:
741                                 *((uint32_t *) p) = htonl(emit_time.sec);
742                                 p += NETFLOW_UNIX_SECS_SIZE;
743                                 break;
744
745                         case NETFLOW_UNIX_NSECS:
746                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
747                                 p += NETFLOW_UNIX_NSECS_SIZE;
748                                 break;
749
750                         case NETFLOW_FLOW_SEQUENCE:
751                                 //*((uint32_t *) p) = htonl(emit_sequence);
752                                 *((uint32_t *) p) = 0;
753                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
754                                 break;
755
756                         case NETFLOW_PAD8:
757                                 /* Unsupported (uint8_t) */
758                         case NETFLOW_ENGINE_TYPE:
759                         case NETFLOW_ENGINE_ID:
760                         case NETFLOW_FLAGS7_1:
761                         case NETFLOW_SRC_MASK:
762                         case NETFLOW_DST_MASK:
763                                 if (onlyonce) {
764                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
765                                         onlyonce=1;
766                                 }
767                                 *((uint8_t *) p) = 0;
768                                 p += NETFLOW_PAD8_SIZE;
769                                 break;
770                         case NETFLOW_SLICE_ID:
771                                 *((uint32_t *) p) = flow->slice_id;
772                                 p += NETFLOW_SLICE_ID_SIZE;
773                                 break;
774                         case NETFLOW_PAD16:
775                                 /* Unsupported (uint16_t) */
776                         case NETFLOW_SRC_AS:
777                         case NETFLOW_DST_AS:
778                         case NETFLOW_FLAGS7_2:
779                                 *((uint16_t *) p) = 0;
780                                 p += NETFLOW_PAD16_SIZE;
781                                 break;
782
783                         case NETFLOW_PAD32:
784                                 /* Unsupported (uint32_t) */
785                         case NETFLOW_IPV4_NEXT_HOP:
786                         case NETFLOW_ROUTER_SC:
787                                 *((uint32_t *) p) = 0;
788                                 p += NETFLOW_PAD32_SIZE;
789                                 break;
790
791                         default:
792                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
793                                                 format, i, format[i]);
794                                 exit(1);
795                 }
796         }
797 #if ((DEBUG) & DEBUG_F)
798         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
799 #endif
800         return p;
801 }
802
803 void setuser() {
804         /*
805            Workaround for clone()-based threads
806            Try to change EUID independently of main thread
807            */
808         if (pw) {
809                 setgroups(0, NULL);
810                 setregid(pw->pw_gid, pw->pw_gid);
811                 setreuid(pw->pw_uid, pw->pw_uid);
812         }
813 }
814
815 void *emit_thread()
816 {
817         struct Flow *flow;
818         void *p;
819         struct timeval now;
820         struct timespec timeout;
821         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
822
823         p = (void *) &emit_packet + netflow->HeaderSize;
824         timeout.tv_nsec = 0;
825
826         setuser();
827         
828         for (;;) {
829                 pthread_mutex_lock(&emit_mutex);
830                 while (!flows_emit) {
831                         gettimeofday(&now, 0);
832                         timeout.tv_sec = now.tv_sec + emit_timeout;
833                         /* Do not wait until emit_packet will filled - it may be too long */
834             int res=-1;
835             while ((res=pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout))==-1) continue;
836
837                         if (res && emit_count) {
838                 //my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
839                                 pthread_mutex_unlock(&emit_mutex);
840                                 goto sendit;
841                         }
842                 }
843                 flow = flows_emit;
844                 flows_emit = flows_emit->next;
845 #if ((DEBUG) & DEBUG_I)
846                 emit_queue--;
847 #endif          
848                 pthread_mutex_unlock(&emit_mutex);
849
850 #ifdef UPTIME_TRICK
851                 if (!emit_count) {
852                         gettime(&start_time);
853                         start_time.sec -= start_time_offset;
854                 }
855 #endif
856                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
857                 mem_free(flow);
858                 emit_count++;
859 #ifdef PF2_DEBUG
860                 printf("Emit count = %d\n", emit_count);
861                 fflush(stdout);
862 #endif
863                 if (emit_count == netflow->MaxFlows) {
864 sendit:
865                         gettime(&emit_time);
866                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
867                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
868                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
869 #ifdef STD_NETFLOW_PDU
870                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
871 #endif
872                         peer_rot_cur = 0;
873                         for (i = 0; i < npeers; i++) {
874                                 if (peers[i].type == PEER_FILE) {
875                                         if (netflow->SeqOffset)
876                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
877                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
878                                         ret = write(peers[i].write_fd, emit_packet, size);
879                                         if (ret < size) {
880
881 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
882                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
883                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
884 #endif
885 #undef MESSAGES
886                                         }
887 #if ((DEBUG) & DEBUG_E)
888                                         else {
889                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
890                                                                 emit_count, i + 1, peers[i].seq);
891                                         }
892 #endif
893                                         peers[i].seq += emit_count;
894
895                                         /* Rate limit */
896                                         if (emit_rate_bytes) {
897                                                 sent += size;
898                                                 delay = sent / emit_rate_bytes;
899                                                 if (delay) {
900                                                         sent %= emit_rate_bytes;
901                                                         timeout.tv_sec = 0;
902                                                         timeout.tv_nsec = emit_rate_delay * delay;
903                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
904                                                 }
905                                         }
906                                 }
907                                 else
908                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
909                                         else
910                                                 if (peers[i].type == PEER_ROTATE) 
911                                                         if (peer_rot_cur++ == peer_rot_work) {
912 sendreal:
913                                                                 if (netflow->SeqOffset)
914                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
915                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
916                                                                 if (ret < size) {
917 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
918                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
919                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
920 #endif
921                                                                 }
922 #if ((DEBUG) & DEBUG_E)
923                                                                 else {
924                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
925                                                                                         emit_count, i + 1, peers[i].seq);
926                                                                 }
927 #endif
928                                                                 peers[i].seq += emit_count;
929
930                                                                 /* Rate limit */
931                                                                 if (emit_rate_bytes) {
932                                                                         sent += size;
933                                                                         delay = sent / emit_rate_bytes;
934                                                                         if (delay) {
935                                                                                 sent %= emit_rate_bytes;
936                                                                                 timeout.tv_sec = 0;
937                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
938                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
939                                                                         }
940                                                                 }
941                                                         }
942                         }
943                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
944                         emit_sequence += emit_count;
945                         emit_count = 0;
946 #if ((DEBUG) & DEBUG_I)
947                         emit_pkts++;
948 #endif
949                 }
950         }
951 }       
952
953 void *unpending_thread()
954 {
955         struct timeval now;
956         struct timespec timeout;
957 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
958         char logbuf[256];
959 #endif
960
961         setuser();
962
963         timeout.tv_nsec = 0;
964         pthread_mutex_lock(&unpending_mutex);
965
966         for (;;) {
967                 while (!(pending_tail->flags & FLOW_PENDING)) {
968                         gettimeofday(&now, 0);
969                         timeout.tv_sec = now.tv_sec + unpending_timeout;
970                         while (pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout)==-1)
971                 continue;
972                 }
973
974 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
975                 *logbuf = 0;
976 #endif
977                 if (put_into(pending_tail, COPY_INTO
978 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
979                                         , logbuf
980 #endif
981                             ) < 0) {
982 #if ((DEBUG) & DEBUG_I)
983                         pkts_lost_unpending++;
984 #endif                          
985                 }
986
987 #if ((DEBUG) & DEBUG_U)
988                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
989 #endif
990
991                 pending_tail->flags = 0;
992                 pending_tail = pending_tail->next;
993 #if ((DEBUG) & DEBUG_I)
994                 pkts_pending_done++;
995 #endif
996         }
997 }
998
999 void *scan_thread()
1000 {
1001 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1002         char logbuf[256];
1003 #endif
1004         int i;
1005         struct Flow *flow, **flowpp;
1006         struct Time now;
1007         struct timespec timeout;
1008
1009         setuser();
1010
1011         timeout.tv_nsec = 0;
1012         pthread_mutex_lock(&scan_mutex);
1013
1014         for (;;) {
1015                 gettime(&now);
1016                 timeout.tv_sec = now.sec + scan_interval;
1017                 while (pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout)==-1)
1018             continue;
1019
1020                 gettime(&now);
1021 #if ((DEBUG) & DEBUG_S)
1022                 my_log(LOG_DEBUG, "S: %d", now.sec);
1023 #endif
1024                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1025                         pthread_mutex_lock(&flows_mutex[i]);
1026                         flow = flows[i];
1027                         flowpp = &flows[i];
1028                         while (flow) {
1029                                 if (flow->flags & FLOW_FRAG) {
1030                                         /* Process fragmented flow */
1031                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1032                                                 /* Fragmented flow expired - put it into special chain */
1033 #if ((DEBUG) & DEBUG_I)
1034                                                 flows_fragmented--;
1035                                                 flows_total--;
1036 #endif
1037                                                 *flowpp = flow->next;
1038                                                 flow->id = 0;
1039                                                 flow->flags &= ~FLOW_FRAG;
1040                                                 flow->next = scan_frag_dreg;
1041                                                 scan_frag_dreg = flow;
1042                                                 flow = *flowpp;
1043                                                 continue;
1044                                         }
1045                                 } else {
1046                                         /* Flow is not frgamented */
1047                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1048                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1049                                                 /* Flow expired */
1050 #if ((DEBUG) & DEBUG_S)
1051                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1052 #endif
1053 #if ((DEBUG) & DEBUG_I)
1054                                                 flows_total--;
1055 #endif
1056                                                 *flowpp = flow->next;
1057                                                 pthread_mutex_lock(&emit_mutex);
1058                                                 flow->next = flows_emit;
1059                                                 flows_emit = flow;
1060 #if ((DEBUG) & DEBUG_I)
1061                                                 emit_queue++;
1062 #endif                          
1063                                                 pthread_mutex_unlock(&emit_mutex);
1064                                                 flow = *flowpp;
1065                                                 continue;
1066                                         }
1067                                 }
1068                                 flowpp = &flow->next;
1069                                 flow = flow->next;
1070                         } /* chain loop */
1071                         pthread_mutex_unlock(&flows_mutex[i]);
1072                 } /* hash loop */
1073                 if (flows_emit) pthread_cond_signal(&emit_cond);
1074
1075                 while (scan_frag_dreg) {
1076                         flow = scan_frag_dreg;
1077                         scan_frag_dreg = flow->next;
1078 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1079                         *logbuf = 0;
1080 #endif
1081                         put_into(flow, MOVE_INTO
1082 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1083                                         , logbuf
1084 #endif
1085                                 );
1086 #if ((DEBUG) & DEBUG_S)
1087                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1088 #endif
1089                 }
1090         }
1091 }
1092
1093 void *cap_thread()
1094 {
1095         struct ulog_packet_msg *ulog_msg;
1096         struct ip *nl;
1097         void *tl;
1098         struct Flow *flow;
1099         int len, off_frag, psize;
1100 #if ((DEBUG) & DEBUG_C)
1101         char buf[64];
1102         char logbuf[256];
1103 #endif
1104         int challenge;
1105
1106         setuser();
1107
1108         while (!killed) {
1109                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1110                 if (len <= 0) {
1111                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1112                         continue;
1113                 }
1114                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1115
1116 #if ((DEBUG) & DEBUG_C)
1117                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1118 #endif
1119
1120                         nl = (void *) &ulog_msg->payload;
1121                         psize = ulog_msg->data_len;
1122
1123                         /* Sanity check */
1124                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1125 #if ((DEBUG) & DEBUG_C)
1126                                 strcat(logbuf, " U");
1127                                 my_log(LOG_DEBUG, "%s", logbuf);
1128 #endif
1129 #if ((DEBUG) & DEBUG_I)
1130                                 pkts_ignored++;
1131 #endif
1132                                 continue;
1133                         }
1134
1135                         if (pending_head->flags) {
1136 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1137                                 my_log(LOG_ERR,
1138 # if ((DEBUG) & DEBUG_C)
1139                                                 "%s %s %s", logbuf,
1140 # else
1141                                                 "%s %s",
1142 # endif
1143                                                 "pending queue full:", "packet lost");
1144 #endif
1145 #if ((DEBUG) & DEBUG_I)
1146                                 pkts_lost_capture++;
1147 #endif
1148                                 goto done;
1149                         }
1150
1151 #if ((DEBUG) & DEBUG_I)
1152                         pkts_total++;
1153 #endif
1154
1155                         flow = pending_head;
1156
1157                         /* ?FIXME? Add sanity check for ip_len? */
1158                         flow->size = ntohs(nl->ip_len);
1159 #if ((DEBUG) & DEBUG_I)
1160                         size_total += flow->size;
1161 #endif
1162
1163                         flow->sip = nl->ip_src;
1164                         flow->dip = nl->ip_dst;
1165                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1166                         
1167                         /* It's going to be expensive calling this syscall on every flow.
1168                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1169
1170                         flow->slice_id = ulog_msg->mark;
1171
1172                         /*if (flow->slice_id < 1)
1173                                 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid*/
1174
1175
1176                         if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1177                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1178                         }
1179                         flow->iif = snmp_index(ulog_msg->indev_name);
1180                         flow->oif = snmp_index(ulog_msg->outdev_name);
1181                         flow->proto = nl->ip_p;
1182                         flow->id = 0;
1183                         flow->tcp_flags = 0;
1184                         flow->pkts = 1;
1185                         flow->sizeF = 0;
1186                         flow->sizeP = 0;
1187                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1188                         if (ulog_msg->timestamp_sec) {
1189                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1190                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1191                         } else gettime(&flow->ctime);
1192                         flow->mtime = flow->ctime;
1193
1194                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1195
1196                         /*
1197                            Offset (from network layer) to transport layer header/IP data
1198                            IOW IP header size ;-)
1199
1200                            ?FIXME?
1201                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1202                            */
1203                         off_tl = nl->ip_hl << 2;
1204                         tl = (void *) nl + off_tl;
1205
1206                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1207                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1208                         psize -= off_tl;
1209                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1210                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1211
1212                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1213                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1214 #if ((DEBUG) & DEBUG_C)
1215                                 strcat(logbuf, " F");
1216 #endif
1217 #if ((DEBUG) & DEBUG_I)
1218                                 pkts_total_fragmented++;
1219 #endif
1220                                 flow->flags |= FLOW_FRAG;
1221                                 flow->id = nl->ip_id;
1222
1223                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1224                                         /* Packet whith IP_MF contains information about whole datagram size */
1225                                         flow->flags |= FLOW_LASTFRAG;
1226                                         /* size = frag_offset*8 + data_size */
1227                                         flow->sizeP = off_frag + flow->sizeF;
1228                                 }
1229                         }
1230
1231 #if ((DEBUG) & DEBUG_C)
1232                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1233                         strcat(logbuf, buf);
1234                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1235                         strcat(logbuf, buf);
1236 #endif
1237
1238                         /*
1239                            Fortunately most interesting transport layer information fit
1240                            into first 8 bytes of IP data field (minimal nonzero size).
1241                            Thus we don't need actual packet reassembling to build whole
1242                            transport layer data. We only check the fragment offset for
1243                            zero value to find packet with this information.
1244                            */
1245                         if (!off_frag && psize >= 8) {
1246                                 switch (flow->proto) {
1247                                         case IPPROTO_TCP:
1248                                         case IPPROTO_UDP:
1249                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1250                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1251                                                 goto tl_known;
1252
1253 #ifdef ICMP_TRICK
1254                                         case IPPROTO_ICMP:
1255                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1256                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1257                                                 goto tl_known;
1258 #endif
1259 #ifdef ICMP_TRICK_CISCO
1260                                         case IPPROTO_ICMP:
1261                                                 flow->dp = *((int32_t *) tl);
1262                                                 goto tl_known;
1263 #endif
1264
1265                                         default:
1266                                                 /* Unknown transport layer */
1267 #if ((DEBUG) & DEBUG_C)
1268                                                 strcat(logbuf, " U");
1269 #endif
1270                                                 flow->sp = 0;
1271                                                 flow->dp = 0;
1272                                                 break;
1273
1274 tl_known:
1275 #if ((DEBUG) & DEBUG_C)
1276                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1277                                                 strcat(logbuf, buf);
1278 #endif
1279                                                 flow->flags |= FLOW_TL;
1280                                 }
1281                         }
1282
1283                         /* Check for tcp flags presence (including CWR and ECE). */
1284                         if (flow->proto == IPPROTO_TCP
1285                                         && off_frag < 16
1286                                         && psize >= 16 - off_frag) {
1287                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1288 #if ((DEBUG) & DEBUG_C)
1289                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1290                                 strcat(logbuf, buf);
1291 #endif
1292                         }
1293
1294 #if ((DEBUG) & DEBUG_C)
1295                         sprintf(buf, " => %x", (unsigned) flow);
1296                         strcat(logbuf, buf);
1297                         my_log(LOG_DEBUG, "%s", logbuf);
1298 #endif
1299
1300 #if ((DEBUG) & DEBUG_I)
1301                         pkts_pending++;
1302                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1303                         if (pending_queue_trace < pending_queue_trace_candidate)
1304                                 pending_queue_trace = pending_queue_trace_candidate;
1305 #endif
1306
1307                         /* Flow complete - inform unpending_thread() about it */
1308                         pending_head->flags |= FLOW_PENDING;
1309                         pending_head = pending_head->next;
1310 done:
1311                         pthread_cond_signal(&unpending_cond);
1312                 }
1313         }
1314         return 0;
1315 }
1316
1317 /* Copied out of CoDemux */
1318
1319 static int init_daemon() {
1320         pid_t pid;
1321         FILE *pidfile;
1322
1323         pidfile = fopen(PIDFILE, "w");
1324         if (pidfile == NULL) {
1325                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1326         }
1327
1328         if ((pid = fork()) < 0) {
1329                 fclose(pidfile);
1330                 my_log(LOG_ERR, "Could not fork!\n");
1331                 return(-1);
1332         }
1333         else if (pid != 0) {
1334                 /* i'm the parent, writing down the child pid  */
1335                 fprintf(pidfile, "%u\n", pid);
1336                 fclose(pidfile);
1337                 exit(0);
1338         }
1339
1340         /* close the pid file */
1341         fclose(pidfile);
1342
1343         /* routines for any daemon process
1344            1. create a new session 
1345            2. change directory to the root
1346            3. change the file creation permission 
1347            */
1348         setsid();
1349         chdir("/var/local/fprobe");
1350         umask(0);
1351
1352         return(0);
1353 }
1354
1355 int main(int argc, char **argv)
1356 {
1357         char errpbuf[512];
1358         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1359         int c, i, write_fd, memory_limit = 0;
1360         struct addrinfo hints, *res;
1361         struct sockaddr_in saddr;
1362         pthread_attr_t tattr;
1363         struct sigaction sigact;
1364         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1365         struct timeval timeout;
1366
1367         sched_min = sched_get_priority_min(SCHED);
1368         sched_max = sched_get_priority_max(SCHED);
1369
1370         memset(&saddr, 0 , sizeof(saddr));
1371         memset(&hints, 0 , sizeof(hints));
1372         hints.ai_flags = AI_PASSIVE;
1373         hints.ai_family = AF_INET;
1374         hints.ai_socktype = SOCK_DGRAM;
1375
1376         /* Process command line options */
1377
1378         opterr = 0;
1379         while ((c = my_getopt(argc, argv, parms)) != -1) {
1380                 switch (c) {
1381                         case '?':
1382                                 usage();
1383
1384                         case 'h':
1385                                 usage();
1386                 }
1387         }
1388
1389         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1390         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1391         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1392         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1393         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1394         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1395         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1396         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1397         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1398         if (parms[nflag].count) {
1399                 switch (atoi(parms[nflag].arg)) {
1400                         case 1:
1401                                 netflow = &NetFlow1;
1402                                 break;
1403
1404                         case 5:
1405                                 break;
1406
1407                         case 7:
1408                                 netflow = &NetFlow7;
1409                                 break;
1410
1411                         default:
1412                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1413                                 exit(1);
1414                 }
1415         }
1416         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1417         if (parms[lflag].count) {
1418                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1419                         *log_suffix++ = 0;
1420                         if (*log_suffix) {
1421                                 sprintf(errpbuf, "[%s]", log_suffix);
1422                                 strcat(ident, errpbuf);
1423                         }
1424                 }
1425                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1426                 if (log_suffix) *--log_suffix = ':';
1427         }
1428         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1429 err_malloc:
1430                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1431                 exit(1);
1432         }
1433         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1434         if (parms[qflag].count) {
1435                 pending_queue_length = atoi(parms[qflag].arg);
1436                 if (pending_queue_length < 1) {
1437                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1438                         exit(1);
1439                 }
1440         }
1441         if (parms[rflag].count) {
1442                 schedp.sched_priority = atoi(parms[rflag].arg);
1443                 if (schedp.sched_priority
1444                                 && (schedp.sched_priority < sched_min
1445                                         || schedp.sched_priority > sched_max)) {
1446                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1447                         exit(1);
1448                 }
1449         }
1450         if (parms[Bflag].count) {
1451                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1452         }
1453         if (parms[bflag].count) {
1454                 bulk_quantity = atoi(parms[bflag].arg);
1455                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1456                         fprintf(stderr, "Illegal %s\n", "bulk size");
1457                         exit(1);
1458                 }
1459         }
1460         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1461         if (parms[Xflag].count) {
1462                 for(i = 0; parms[Xflag].arg[i]; i++)
1463                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1464                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1465                         goto err_malloc;
1466                 rule = strtok(parms[Xflag].arg, ":");
1467                 for (i = 0; rule; i++) {
1468                         snmp_rules[i].len = strlen(rule);
1469                         if (snmp_rules[i].len > IFNAMSIZ) {
1470                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1471                                 exit(1);
1472                         }
1473                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1474                         if (!*(rule - 1)) *(rule - 1) = ',';
1475                         rule = strtok(NULL, ",");
1476                         if (!rule) {
1477                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1478                                 exit(1);
1479                         }
1480                         snmp_rules[i].base = atoi(rule);
1481                         *(rule - 1) = ':';
1482                         rule = strtok(NULL, ":");
1483                 }
1484                 nsnmp_rules = i;
1485         }
1486         if (parms[tflag].count)
1487                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1488         if (parms[aflag].count) {
1489                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1490 bad_lhost:
1491                         fprintf(stderr, "Illegal %s\n", "source address");
1492                         exit(1);
1493                 } else {
1494                         saddr = *((struct sockaddr_in *) res->ai_addr);
1495                         freeaddrinfo(res);
1496                 }
1497         }
1498         if (parms[uflag].count) 
1499                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1500                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1501                         exit(1);
1502                 }
1503
1504
1505         /* Process collectors parameters. Brrrr... :-[ */
1506
1507         npeers = argc - optind;
1508         if (npeers >= 1) {
1509                 /* Send to remote Netflow collector */
1510                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1511                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1512                         dhost = argv[i];
1513                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1514                         *dport++ = 0;
1515                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1516                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1517                                 exit(1);
1518                         }
1519                         peers[npeers].write_fd = write_fd;
1520                         peers[npeers].type = PEER_MIRROR;
1521                         peers[npeers].laddr = saddr;
1522                         peers[npeers].seq = 0;
1523                         if ((lhost = strchr(dport, '/'))) {
1524                                 *lhost++ = 0;
1525                                 if ((type = strchr(lhost, '/'))) {
1526                                         *type++ = 0;
1527                                         switch (*type) {
1528                                                 case 0:
1529                                                 case 'm':
1530                                                         break;
1531
1532                                                 case 'r':
1533                                                         peers[npeers].type = PEER_ROTATE;
1534                                                         npeers_rot++;
1535                                                         break;
1536
1537                                                 default:
1538                                                         goto bad_collector;
1539                                         }
1540                                 }
1541                                 if (*lhost) {
1542                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1543                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1544                                         freeaddrinfo(res);
1545                                 }
1546                         }
1547                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1548                                                 sizeof(struct sockaddr_in))) {
1549                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1550                                 exit(1);
1551                         }
1552                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1553 bad_collector:
1554                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1555                                 exit(1);
1556                         }
1557                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1558                         freeaddrinfo(res);
1559                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1560                                                 sizeof(struct sockaddr_in))) {
1561                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1562                                 exit(1);
1563                         }
1564
1565                         /* Restore command line */
1566                         if (type) *--type = '/';
1567                         if (lhost) *--lhost = '/';
1568                         *--dport = ':';
1569                 }
1570         }
1571         else if (parms[fflag].count) {
1572                 // log into a file
1573                 if (!(peers = malloc(sizeof(struct peer)))) goto err_malloc;
1574                 if (!(peers[npeers].fname = strndup(parms[fflag].arg, MAX_PATH_LEN))) goto err_malloc;
1575                 peers[npeers].write_fd = START_DATA_FD;
1576                 peers[npeers].type = PEER_FILE;
1577                 peers[npeers].seq = 0;
1578
1579                 read_cur_epoch();
1580                 npeers++;
1581         }
1582         else 
1583                 usage();
1584
1585
1586         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1587         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1588         if (!ulog_handle) {
1589                 fprintf(stderr, "libipulog initialization error: %s",
1590                                 ipulog_strerror(ipulog_errno));
1591                 exit(1);
1592         }
1593         if (sockbufsize)
1594                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1595                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1596                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1597
1598         /* Daemonize (if log destination stdout-free) */
1599
1600         my_log_open(ident, verbosity, log_dest);
1601
1602         init_daemon();
1603
1604         if (!(log_dest & 2)) {
1605                 /* Crash-proofing - Sapan*/
1606                 while (1) {
1607                         int pid=fork();
1608                         if (pid==-1) {
1609                                 fprintf(stderr, "fork(): %s", strerror(errno));
1610                                 exit(1);
1611                         }
1612                         else if (pid==0) {
1613                                 setsid();
1614                                 freopen("/dev/null", "r", stdin);
1615                                 freopen("/dev/null", "w", stdout);
1616                                 freopen("/dev/null", "w", stderr);
1617                                 break;
1618                         }
1619                         else {
1620                                 while (wait3(NULL,0,NULL) < 1);
1621                         }
1622                 }
1623         } else {
1624                 setvbuf(stdout, (char *)0, _IONBF, 0);
1625                 setvbuf(stderr, (char *)0, _IONBF, 0);
1626         }
1627
1628         pid = getpid();
1629         sprintf(errpbuf, "[%ld]", (long) pid);
1630         strcat(ident, errpbuf);
1631
1632         /* Initialization */
1633
1634     // init_slice_id_hash();
1635         hash_init(); /* Actually for crc16 only */
1636         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1637         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1638
1639 #ifdef UPTIME_TRICK
1640         /* Hope 12 days is enough :-/ */
1641         start_time_offset = 1 << 20;
1642
1643         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1644 #endif
1645         gettime(&start_time);
1646
1647         /*
1648            Build static pending queue as circular buffer.
1649            */
1650         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1651         pending_tail = pending_head;
1652         for (i = pending_queue_length - 1; i--;) {
1653                 if (!(pending_tail->next = mem_alloc())) {
1654 err_mem_alloc:
1655                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1656                         exit(1);
1657                 }
1658                 pending_tail = pending_tail->next;
1659         }
1660         pending_tail->next = pending_head;
1661         pending_tail = pending_head;
1662
1663         sigemptyset(&sig_mask);
1664         sigact.sa_handler = &sighandler;
1665         sigact.sa_mask = sig_mask;
1666         sigact.sa_flags = 0;
1667         sigaddset(&sig_mask, SIGTERM);
1668         sigaction(SIGTERM, &sigact, 0);
1669 #if ((DEBUG) & DEBUG_I)
1670         sigaddset(&sig_mask, SIGUSR1);
1671         sigaction(SIGUSR1, &sigact, 0);
1672 #endif
1673         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1674                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1675                 exit(1);
1676         }
1677
1678         my_log(LOG_INFO, "Starting %s...", VERSION);
1679
1680         if (parms[cflag].count) {
1681                 if (chdir(parms[cflag].arg) || chroot(".")) {
1682                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1683                         exit(1);
1684                 }
1685         }
1686
1687         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1688         pthread_attr_init(&tattr);
1689         for (i = 0; i < THREADS - 1; i++) {
1690                 if (schedp.sched_priority > 0) {
1691                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1692                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1693                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1694                                 exit(1);
1695                         }
1696                 }
1697                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1698                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1699                         exit(1);
1700                 }
1701                 pthread_detach(thid);
1702                 schedp.sched_priority++;
1703         }
1704
1705         if (pw) {
1706                 if (setgroups(0, NULL)) {
1707                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1708                         exit(1);
1709                 }
1710                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1711                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1712                         exit(1);
1713                 }
1714                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1715                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1716                         exit(1);
1717                 }
1718         }
1719
1720         if (!(pidfile = fopen(pidfilepath, "w")))
1721                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1722         else {
1723                 fprintf(pidfile, "%ld\n", (long) pid);
1724                 fclose(pidfile);
1725         }
1726
1727         my_log(LOG_INFO, "pid: %d", pid);
1728         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1729                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1730                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1731                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1732                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1733                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1734                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1735         for (i = 0; i < nsnmp_rules; i++) {
1736                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1737                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1738         }
1739         for (i = 0; i < npeers; i++) {
1740                 switch (peers[i].type) {
1741                         case PEER_MIRROR:
1742                                 c = 'm';
1743                                 break;
1744                         case PEER_ROTATE:
1745                                 c = 'r';
1746                                 break;
1747                 }
1748                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1749                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1750                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1751         }
1752
1753         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1754
1755         timeout.tv_usec = 0;
1756         while (!killed
1757                         || (total_elements - free_elements - pending_queue_length)
1758                         || emit_count
1759                         || pending_tail->flags) {
1760
1761                 if (!sigs) {
1762                         timeout.tv_sec = scan_interval;
1763                         select(0, 0, 0, 0, &timeout);
1764                 }
1765
1766                 if (sigs & SIGTERM_MASK && !killed) {
1767                         sigs &= ~SIGTERM_MASK;
1768                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1769                         scan_interval = 1;
1770                         frag_lifetime = -1;
1771                         active_lifetime = -1;
1772                         inactive_lifetime = -1;
1773                         emit_timeout = 1;
1774                         unpending_timeout = 1;
1775                         killed = 1;
1776                         pthread_cond_signal(&scan_cond);
1777                         pthread_cond_signal(&unpending_cond);
1778                 }
1779
1780 #if ((DEBUG) & DEBUG_I)
1781                 if (sigs & SIGUSR1_MASK) {
1782                         sigs &= ~SIGUSR1_MASK;
1783                         info_debug();
1784                 }
1785 #endif
1786         }
1787         remove(pidfilepath);
1788 #if ((DEBUG) & DEBUG_I)
1789         info_debug();
1790 #endif
1791         my_log(LOG_INFO, "Done.");
1792 #ifdef WALL
1793         return 0;
1794 #endif
1795 }