Dropped a line of configuration.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, slice_id support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 #include "vserver.h"
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220 static char cur_output_file[MAX_PATH_LEN];
221
222 static struct Flow *flows[1 << HASH_BITS];
223 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
224
225 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
227
228 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *pending_head, *pending_tail;
231 static struct Flow *scan_frag_dreg;
232
233 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
235 static struct Flow *flows_emit;
236
237 static char ident[256] = "fprobe-ulog";
238 static FILE *pidfile;
239 static char *pidfilepath;
240 static pid_t pid;
241 static int killed;
242 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
243 static struct ipulog_handle *ulog_handle;
244 static uint32_t ulog_gmask = 1;
245 static char *cap_buf;
246 static int nsnmp_rules;
247 static struct snmp_rule *snmp_rules;
248 static struct passwd *pw = 0;
249
250 void usage()
251 {
252         fprintf(stdout,
253                         "fprobe-ulog: a NetFlow probe. Version %s\n"
254                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
255                         "\n"
256                         "-h\t\tDisplay this help\n"
257                         "-U <mask>\tULOG group bitwise mask [1]\n"
258                         "-s <seconds>\tHow often scan for expired flows [5]\n"
259                         "-g <seconds>\tFragmented flow lifetime [30]\n"
260                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
261                         "-f <filename>\tLog flow data in a file\n"
262                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
263                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
264                         "-a <address>\tUse <address> as source for NetFlow flow\n"
265                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
266                         "-M\t\tUse netfilter mark value as ToS flag\n"
267                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
268                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
269                         "-q <flows>\tPending queue length [100]\n"
270                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
271                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
272                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
273                         "-c <directory>\tDirectory to chroot to\n"
274                         "-u <user>\tUser to run as\n"
275                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
276                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
277                         "-y <remote:port>\tAddress of the NetFlow collector\n"
278                         "-f <writable file>\tFile to write data into\n"
279                         "-T <n>\tRotate log file every n epochs\n"
280                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
281                         "-E <[1..60]>\tSize of an epoch in minutes\n"
282                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
283                         ,
284                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
285         exit(0);
286 }
287
288 #if ((DEBUG) & DEBUG_I)
289 void info_debug()
290 {
291         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
292                         pkts_total, pkts_total_fragmented, size_total,
293                         pkts_pending - pkts_pending_done, pending_queue_trace);
294         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
295                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
296         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
297                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
298         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
299                         total_elements, free_elements, total_memory);
300 }
301 #endif
302
303 void sighandler(int sig)
304 {
305         switch (sig) {
306                 case SIGTERM:
307                         sigs |= SIGTERM_MASK;
308                         break;
309 #if ((DEBUG) & DEBUG_I)
310                 case SIGUSR1:
311                         sigs |= SIGUSR1_MASK;
312                         break;
313 #endif
314         }
315 }
316
317 void gettime(struct Time *now)
318 {
319         struct timeval t;
320
321         gettimeofday(&t, 0);
322         now->sec = t.tv_sec;
323         now->usec = t.tv_usec;
324 }
325
326
327 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
328 {
329         return (t1->sec - t2->sec)/60;
330 }
331
332 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
333 {
334         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
335 }
336
337 /* Uptime in miliseconds */
338 uint32_t getuptime(struct Time *t)
339 {
340         /* Maximum uptime is about 49/2 days */
341         return cmpmtime(t, &start_time);
342 }
343
344 /* Uptime in minutes */
345 uint32_t getuptime_minutes(struct Time *t)
346 {
347         /* Maximum uptime is about 49/2 days */
348         return cmpMtime(t, &start_time);
349 }
350
351 hash_t hash_flow(struct Flow *flow)
352 {
353         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
354         else return hash(flow, sizeof(struct Flow_TL));
355 }
356
357 uint16_t snmp_index(char *name) {
358         uint32_t i;
359
360         if (!*name) return 0;
361
362         for (i = 0; (int) i < nsnmp_rules; i++) {
363                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
364                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
365         }
366
367         if ((i = if_nametoindex(name))) return i;
368
369         return -1;
370 }
371
372 inline void copy_flow(struct Flow *src, struct Flow *dst)
373 {
374         dst->iif = src->iif;
375         dst->oif = src->oif;
376         dst->sip = src->sip;
377         dst->dip = src->dip;
378         dst->tos = src->tos;
379         dst->slice_id = src->slice_id;
380         dst->proto = src->proto;
381         dst->tcp_flags = src->tcp_flags;
382         dst->id = src->id;
383         dst->sp = src->sp;
384         dst->dp = src->dp;
385         dst->pkts = src->pkts;
386         dst->size = src->size;
387         dst->sizeF = src->sizeF;
388         dst->sizeP = src->sizeP;
389         dst->ctime = src->ctime;
390         dst->mtime = src->mtime;
391         dst->flags = src->flags;
392 }
393
394 void read_cur_epoch() {
395         int fd;
396         /* Reset to -1 in case the read fails */
397         cur_epoch=-1;
398         fd = open(LAST_EPOCH_FILE, O_RDONLY);
399         if (fd != -1) {
400                 char snum[MAX_EPOCH_SIZE];
401                 ssize_t len;
402                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
403                 if (len != -1) {
404                         snum[len]='\0';
405                         sscanf(snum,"%d",&cur_epoch);
406                         cur_epoch++; /* Let's not stone the last epoch */
407                         close(fd);
408                 }
409         }
410         return;
411 }
412
413
414 /* Dumps the current epoch in a file to cope with
415  * reboots and killings of fprobe */
416
417 void update_cur_epoch_file(int n) {
418         int fd, len;
419         char snum[MAX_EPOCH_SIZE];
420         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
421         fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
422         if (fd == -1) {
423                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
424                 return;
425         }
426         write(fd, snum, len);
427         close(fd);
428 }
429
430 /* Get the file descriptor corresponding to the current file.
431  * The kludgy implementation is to abstract away the 'current
432  * file descriptor', which may also be a socket. 
433  */
434
435 unsigned get_data_file_fd(char *fname, int cur_fd) {
436         struct Time now;
437         unsigned cur_uptime;
438
439         struct statfs statfs;
440         int ret_fd;
441
442         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
443          * doesn't solve the problem */
444         gettime(&now);
445         cur_uptime = getuptime_minutes(&now);
446
447         if (cur_fd != START_DATA_FD) {
448                 if (fstatfs(cur_fd, &statfs) == -1) {
449                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
450                 }
451                 else {
452                         if (min_free && (statfs.f_bavail < min_free)
453                                         && (cur_epoch==last_peak))
454                         {
455                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
456                                 cur_epoch = -1;
457                         }
458                         /*
459                            else 
460                            assume that we can reclaim space by overwriting our own files
461                            and that the difference in size will not fill the disk - sapan
462                            */
463                 }
464         }
465
466         /* If epoch length has been exceeded, 
467          * or we're starting up 
468          * or we're going back to the first epoch */
469         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0) {
475                         close(cur_fd);
476             /* Compress the finished file */
477             char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; 
478             snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
479             system(gzip_cmd);
480         }
481                 snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
482                 if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)) < 0) {
483                         my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
484                         exit(1);
485                 }
486                 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
487                         my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
488                 }
489                 update_cur_epoch_file(cur_epoch);
490                 ret_fd = write_fd;
491         }
492         else {
493                 ret_fd = cur_fd;
494         }
495         return(ret_fd);
496 }
497
498 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
499 {
500         struct Flow **flowpp;
501
502 #ifdef WALL
503         flowpp = 0;
504 #endif
505
506         if (prev) flowpp = *prev;
507
508         while (where) {
509                 if (where->sip.s_addr == what->sip.s_addr
510                                 && where->dip.s_addr == what->dip.s_addr
511                                 && where->proto == what->proto) {
512                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
513                                 case 0:
514                                         /* Both unfragmented */
515                                         if ((what->sp == where->sp)
516                                                         && (what->dp == where->dp)) goto done;
517                                         break;
518                                 case 2:
519                                         /* Both fragmented */
520                                         if (where->id == what->id) goto done;
521                                         break;
522                         }
523                 }
524                 flowpp = &where->next;
525                 where = where->next;
526         }
527 done:
528         if (prev) *prev = flowpp;
529         return where;
530 }
531
532         int put_into(struct Flow *flow, int flag
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
534                         , char *logbuf
535 #endif
536                     )
537 {
538         int ret = 0;
539         hash_t h;
540         struct Flow *flown, **flowpp;
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542         char buf[64];
543 #endif
544
545         h = hash_flow(flow);
546 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
547         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
548         strcat(logbuf, buf);
549 #endif
550         pthread_mutex_lock(&flows_mutex[h]);
551         flowpp = &flows[h];
552         if (!(flown = find(flows[h], flow, &flowpp))) {
553                 /* No suitable flow found - add */
554                 if (flag == COPY_INTO) {
555                         if ((flown = mem_alloc())) {
556                                 copy_flow(flow, flown);
557                                 flow = flown;
558                         } else {
559 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
560                                 my_log(LOG_ERR, "%s %s. %s",
561                                                 "mem_alloc():", strerror(errno), "packet lost");
562 #endif
563                                 return -1;
564                         }
565                 }
566                 flow->next = flows[h];
567                 flows[h] = flow;
568 #if ((DEBUG) & DEBUG_I)
569                 flows_total++;
570                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
571 #endif
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
573                 if (flown) {
574                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
575                         strcat(logbuf, buf);
576                 }
577 #endif
578         } else {
579                 /* Found suitable flow - update */
580 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
581                 sprintf(buf, " +> %x", (unsigned) flown);
582                 strcat(logbuf, buf);
583 #endif
584                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
585                         flown->mtime = flow->mtime;
586                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
587                         flown->ctime = flow->ctime;
588                 flown->tcp_flags |= flow->tcp_flags;
589                 flown->size += flow->size;
590                 flown->pkts += flow->pkts;
591
592                 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
593                  * if a better value comes along. A good example of this is that by the time CoDemux sets the
594                  * peercred of a flow, it has already been accounted for here and attributed to root. */
595
596                 if (flown->slice_id<1)
597                                 flown->slice_id = flow->slice_id;
598
599
600                 if (flow->flags & FLOW_FRAG) {
601                         /* Fragmented flow require some additional work */
602                         if (flow->flags & FLOW_TL) {
603                                 /*
604                                    ?FIXME?
605                                    Several packets with FLOW_TL (attack)
606                                    */
607                                 flown->sp = flow->sp;
608                                 flown->dp = flow->dp;
609                         }
610                         if (flow->flags & FLOW_LASTFRAG) {
611                                 /*
612                                    ?FIXME?
613                                    Several packets with FLOW_LASTFRAG (attack)
614                                    */
615                                 flown->sizeP = flow->sizeP;
616                         }
617                         flown->flags |= flow->flags;
618                         flown->sizeF += flow->sizeF;
619                         if ((flown->flags & FLOW_LASTFRAG)
620                                         && (flown->sizeF >= flown->sizeP)) {
621                                 /* All fragments received - flow reassembled */
622                                 *flowpp = flown->next;
623                                 pthread_mutex_unlock(&flows_mutex[h]);
624 #if ((DEBUG) & DEBUG_I)
625                                 flows_total--;
626                                 flows_fragmented--;
627 #endif
628                                 flown->id = 0;
629                                 flown->flags &= ~FLOW_FRAG;
630 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
631                                 strcat(logbuf," R");
632 #endif
633                                 ret = put_into(flown, MOVE_INTO
634 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
635                                                 , logbuf
636 #endif
637                                               );
638                         }
639                 }
640                 if (flag == MOVE_INTO) mem_free(flow);
641         }
642         pthread_mutex_unlock(&flows_mutex[h]);
643         return ret;
644 }
645
646 int onlyonce=0;
647
648 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
649 {
650         int i;
651
652         for (i = 0; i < fields; i++) {
653 #if ((DEBUG) & DEBUG_F)
654                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
655 #endif
656                 switch (format[i]) {
657                         case NETFLOW_IPV4_SRC_ADDR:
658                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
659                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
660                                 break;
661
662                         case NETFLOW_IPV4_DST_ADDR:
663                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
664                                 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
665                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
666                                 }
667                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
668                                 break;
669
670                         case NETFLOW_INPUT_SNMP:
671                                 *((uint16_t *) p) = htons(flow->iif);
672                                 p += NETFLOW_INPUT_SNMP_SIZE;
673                                 break;
674
675                         case NETFLOW_OUTPUT_SNMP:
676                                 *((uint16_t *) p) = htons(flow->oif);
677                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
678                                 break;
679
680                         case NETFLOW_PKTS_32:
681                                 *((uint32_t *) p) = htonl(flow->pkts);
682                                 p += NETFLOW_PKTS_32_SIZE;
683                                 break;
684
685                         case NETFLOW_BYTES_32:
686                                 *((uint32_t *) p) = htonl(flow->size);
687                                 p += NETFLOW_BYTES_32_SIZE;
688                                 break;
689
690                         case NETFLOW_FIRST_SWITCHED:
691                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
692                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
693                                 break;
694
695                         case NETFLOW_LAST_SWITCHED:
696                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
697                                 p += NETFLOW_LAST_SWITCHED_SIZE;
698                                 break;
699
700                         case NETFLOW_L4_SRC_PORT:
701                                 *((uint16_t *) p) = flow->sp;
702                                 p += NETFLOW_L4_SRC_PORT_SIZE;
703                                 break;
704
705                         case NETFLOW_L4_DST_PORT:
706                                 *((uint16_t *) p) = flow->dp;
707                                 p += NETFLOW_L4_DST_PORT_SIZE;
708                                 break;
709
710                         case NETFLOW_PROT:
711                                 *((uint8_t *) p) = flow->proto;
712                                 p += NETFLOW_PROT_SIZE;
713                                 break;
714
715                         case NETFLOW_SRC_TOS:
716                                 *((uint8_t *) p) = flow->tos;
717                                 p += NETFLOW_SRC_TOS_SIZE;
718                                 break;
719
720                         case NETFLOW_TCP_FLAGS:
721                                 *((uint8_t *) p) = flow->tcp_flags;
722                                 p += NETFLOW_TCP_FLAGS_SIZE;
723                                 break;
724
725                         case NETFLOW_VERSION:
726                                 *((uint16_t *) p) = htons(netflow->Version);
727                                 p += NETFLOW_VERSION_SIZE;
728                                 break;
729
730                         case NETFLOW_COUNT:
731                                 *((uint16_t *) p) = htons(emit_count);
732                                 p += NETFLOW_COUNT_SIZE;
733                                 break;
734
735                         case NETFLOW_UPTIME:
736                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
737                                 p += NETFLOW_UPTIME_SIZE;
738                                 break;
739
740                         case NETFLOW_UNIX_SECS:
741                                 *((uint32_t *) p) = htonl(emit_time.sec);
742                                 p += NETFLOW_UNIX_SECS_SIZE;
743                                 break;
744
745                         case NETFLOW_UNIX_NSECS:
746                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
747                                 p += NETFLOW_UNIX_NSECS_SIZE;
748                                 break;
749
750                         case NETFLOW_FLOW_SEQUENCE:
751                                 //*((uint32_t *) p) = htonl(emit_sequence);
752                                 *((uint32_t *) p) = 0;
753                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
754                                 break;
755
756                         case NETFLOW_PAD8:
757                                 /* Unsupported (uint8_t) */
758                         case NETFLOW_ENGINE_TYPE:
759                         case NETFLOW_ENGINE_ID:
760                         case NETFLOW_FLAGS7_1:
761                         case NETFLOW_SRC_MASK:
762                         case NETFLOW_DST_MASK:
763                                 if (onlyonce) {
764                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
765                                         onlyonce=1;
766                                 }
767                                 *((uint8_t *) p) = 0;
768                                 p += NETFLOW_PAD8_SIZE;
769                                 break;
770                         case NETFLOW_SLICE_ID:
771                                 *((uint32_t *) p) = flow->slice_id;
772                                 p += NETFLOW_SLICE_ID_SIZE;
773                                 break;
774                         case NETFLOW_PAD16:
775                                 /* Unsupported (uint16_t) */
776                         case NETFLOW_SRC_AS:
777                         case NETFLOW_DST_AS:
778                         case NETFLOW_FLAGS7_2:
779                                 *((uint16_t *) p) = 0;
780                                 p += NETFLOW_PAD16_SIZE;
781                                 break;
782
783                         case NETFLOW_PAD32:
784                                 /* Unsupported (uint32_t) */
785                         case NETFLOW_IPV4_NEXT_HOP:
786                         case NETFLOW_ROUTER_SC:
787                                 *((uint32_t *) p) = 0;
788                                 p += NETFLOW_PAD32_SIZE;
789                                 break;
790
791                         default:
792                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
793                                                 format, i, format[i]);
794                                 exit(1);
795                 }
796         }
797 #if ((DEBUG) & DEBUG_F)
798         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
799 #endif
800         return p;
801 }
802
803 void setuser() {
804         /*
805            Workaround for clone()-based threads
806            Try to change EUID independently of main thread
807            */
808         if (pw) {
809                 setgroups(0, NULL);
810                 setregid(pw->pw_gid, pw->pw_gid);
811                 setreuid(pw->pw_uid, pw->pw_uid);
812         }
813 }
814
815 void *emit_thread()
816 {
817         struct Flow *flow;
818         void *p;
819         struct timeval now;
820         struct timespec timeout;
821         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
822
823         p = (void *) &emit_packet + netflow->HeaderSize;
824         timeout.tv_nsec = 0;
825
826         setuser();
827         
828         for (;;) {
829                 pthread_mutex_lock(&emit_mutex);
830                 while (!flows_emit) {
831                         gettimeofday(&now, 0);
832                         timeout.tv_sec = now.tv_sec + emit_timeout;
833                         /* Do not wait until emit_packet will filled - it may be too long */
834                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
835                 //my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
836                                 pthread_mutex_unlock(&emit_mutex);
837                                 goto sendit;
838                         }
839                 }
840                 flow = flows_emit;
841                 flows_emit = flows_emit->next;
842 #if ((DEBUG) & DEBUG_I)
843                 emit_queue--;
844 #endif          
845                 pthread_mutex_unlock(&emit_mutex);
846
847 #ifdef UPTIME_TRICK
848                 if (!emit_count) {
849                         gettime(&start_time);
850                         start_time.sec -= start_time_offset;
851                 }
852 #endif
853                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
854                 mem_free(flow);
855                 emit_count++;
856 #ifdef PF2_DEBUG
857                 printf("Emit count = %d\n", emit_count);
858                 fflush(stdout);
859 #endif
860                 if (emit_count == netflow->MaxFlows) {
861 sendit:
862                         gettime(&emit_time);
863                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
864                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
865                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
866 #ifdef STD_NETFLOW_PDU
867                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
868 #endif
869                         peer_rot_cur = 0;
870                         for (i = 0; i < npeers; i++) {
871                                 if (peers[i].type == PEER_FILE) {
872                                         if (netflow->SeqOffset)
873                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
874                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
875                                         ret = write(peers[i].write_fd, emit_packet, size);
876                                         if (ret < size) {
877
878 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
879                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
880                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
881 #endif
882 #undef MESSAGES
883                                         }
884 #if ((DEBUG) & DEBUG_E)
885                                         else {
886                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
887                                                                 emit_count, i + 1, peers[i].seq);
888                                         }
889 #endif
890                                         peers[i].seq += emit_count;
891
892                                         /* Rate limit */
893                                         if (emit_rate_bytes) {
894                                                 sent += size;
895                                                 delay = sent / emit_rate_bytes;
896                                                 if (delay) {
897                                                         sent %= emit_rate_bytes;
898                                                         timeout.tv_sec = 0;
899                                                         timeout.tv_nsec = emit_rate_delay * delay;
900                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
901                                                 }
902                                         }
903                                 }
904                                 else
905                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
906                                         else
907                                                 if (peers[i].type == PEER_ROTATE) 
908                                                         if (peer_rot_cur++ == peer_rot_work) {
909 sendreal:
910                                                                 if (netflow->SeqOffset)
911                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
912                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
913                                                                 if (ret < size) {
914 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
915                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
916                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
917 #endif
918                                                                 }
919 #if ((DEBUG) & DEBUG_E)
920                                                                 else {
921                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
922                                                                                         emit_count, i + 1, peers[i].seq);
923                                                                 }
924 #endif
925                                                                 peers[i].seq += emit_count;
926
927                                                                 /* Rate limit */
928                                                                 if (emit_rate_bytes) {
929                                                                         sent += size;
930                                                                         delay = sent / emit_rate_bytes;
931                                                                         if (delay) {
932                                                                                 sent %= emit_rate_bytes;
933                                                                                 timeout.tv_sec = 0;
934                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
935                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
936                                                                         }
937                                                                 }
938                                                         }
939                         }
940                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
941                         emit_sequence += emit_count;
942                         emit_count = 0;
943 #if ((DEBUG) & DEBUG_I)
944                         emit_pkts++;
945 #endif
946                 }
947         }
948 }       
949
950 void *unpending_thread()
951 {
952         struct timeval now;
953         struct timespec timeout;
954 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
955         char logbuf[256];
956 #endif
957
958         setuser();
959
960         timeout.tv_nsec = 0;
961         pthread_mutex_lock(&unpending_mutex);
962
963         for (;;) {
964                 while (!(pending_tail->flags & FLOW_PENDING)) {
965                         gettimeofday(&now, 0);
966                         timeout.tv_sec = now.tv_sec + unpending_timeout;
967                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
968                 }
969
970 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
971                 *logbuf = 0;
972 #endif
973                 if (put_into(pending_tail, COPY_INTO
974 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
975                                         , logbuf
976 #endif
977                             ) < 0) {
978 #if ((DEBUG) & DEBUG_I)
979                         pkts_lost_unpending++;
980 #endif                          
981                 }
982
983 #if ((DEBUG) & DEBUG_U)
984                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
985 #endif
986
987                 pending_tail->flags = 0;
988                 pending_tail = pending_tail->next;
989 #if ((DEBUG) & DEBUG_I)
990                 pkts_pending_done++;
991 #endif
992         }
993 }
994
995 void *scan_thread()
996 {
997 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
998         char logbuf[256];
999 #endif
1000         int i;
1001         struct Flow *flow, **flowpp;
1002         struct Time now;
1003         struct timespec timeout;
1004
1005         setuser();
1006
1007         timeout.tv_nsec = 0;
1008         pthread_mutex_lock(&scan_mutex);
1009
1010         for (;;) {
1011                 gettime(&now);
1012                 timeout.tv_sec = now.sec + scan_interval;
1013                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1014
1015                 gettime(&now);
1016 #if ((DEBUG) & DEBUG_S)
1017                 my_log(LOG_DEBUG, "S: %d", now.sec);
1018 #endif
1019                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1020                         pthread_mutex_lock(&flows_mutex[i]);
1021                         flow = flows[i];
1022                         flowpp = &flows[i];
1023                         while (flow) {
1024                                 if (flow->flags & FLOW_FRAG) {
1025                                         /* Process fragmented flow */
1026                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1027                                                 /* Fragmented flow expired - put it into special chain */
1028 #if ((DEBUG) & DEBUG_I)
1029                                                 flows_fragmented--;
1030                                                 flows_total--;
1031 #endif
1032                                                 *flowpp = flow->next;
1033                                                 flow->id = 0;
1034                                                 flow->flags &= ~FLOW_FRAG;
1035                                                 flow->next = scan_frag_dreg;
1036                                                 scan_frag_dreg = flow;
1037                                                 flow = *flowpp;
1038                                                 continue;
1039                                         }
1040                                 } else {
1041                                         /* Flow is not frgamented */
1042                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1043                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1044                                                 /* Flow expired */
1045 #if ((DEBUG) & DEBUG_S)
1046                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1047 #endif
1048 #if ((DEBUG) & DEBUG_I)
1049                                                 flows_total--;
1050 #endif
1051                                                 *flowpp = flow->next;
1052                                                 pthread_mutex_lock(&emit_mutex);
1053                                                 flow->next = flows_emit;
1054                                                 flows_emit = flow;
1055 #if ((DEBUG) & DEBUG_I)
1056                                                 emit_queue++;
1057 #endif                          
1058                                                 pthread_mutex_unlock(&emit_mutex);
1059                                                 flow = *flowpp;
1060                                                 continue;
1061                                         }
1062                                 }
1063                                 flowpp = &flow->next;
1064                                 flow = flow->next;
1065                         } /* chain loop */
1066                         pthread_mutex_unlock(&flows_mutex[i]);
1067                 } /* hash loop */
1068                 if (flows_emit) pthread_cond_signal(&emit_cond);
1069
1070                 while (scan_frag_dreg) {
1071                         flow = scan_frag_dreg;
1072                         scan_frag_dreg = flow->next;
1073 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1074                         *logbuf = 0;
1075 #endif
1076                         put_into(flow, MOVE_INTO
1077 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1078                                         , logbuf
1079 #endif
1080                                 );
1081 #if ((DEBUG) & DEBUG_S)
1082                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1083 #endif
1084                 }
1085         }
1086 }
1087
1088 void *cap_thread()
1089 {
1090         struct ulog_packet_msg *ulog_msg;
1091         struct ip *nl;
1092         void *tl;
1093         struct Flow *flow;
1094         int len, off_frag, psize;
1095 #if ((DEBUG) & DEBUG_C)
1096         char buf[64];
1097         char logbuf[256];
1098 #endif
1099         int challenge;
1100
1101         setuser();
1102
1103         while (!killed) {
1104                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1105                 if (len <= 0) {
1106                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1107                         continue;
1108                 }
1109                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1110
1111 #if ((DEBUG) & DEBUG_C)
1112                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1113 #endif
1114
1115                         nl = (void *) &ulog_msg->payload;
1116                         psize = ulog_msg->data_len;
1117
1118                         /* Sanity check */
1119                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1120 #if ((DEBUG) & DEBUG_C)
1121                                 strcat(logbuf, " U");
1122                                 my_log(LOG_DEBUG, "%s", logbuf);
1123 #endif
1124 #if ((DEBUG) & DEBUG_I)
1125                                 pkts_ignored++;
1126 #endif
1127                                 continue;
1128                         }
1129
1130                         if (pending_head->flags) {
1131 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1132                                 my_log(LOG_ERR,
1133 # if ((DEBUG) & DEBUG_C)
1134                                                 "%s %s %s", logbuf,
1135 # else
1136                                                 "%s %s",
1137 # endif
1138                                                 "pending queue full:", "packet lost");
1139 #endif
1140 #if ((DEBUG) & DEBUG_I)
1141                                 pkts_lost_capture++;
1142 #endif
1143                                 goto done;
1144                         }
1145
1146 #if ((DEBUG) & DEBUG_I)
1147                         pkts_total++;
1148 #endif
1149
1150                         flow = pending_head;
1151
1152                         /* ?FIXME? Add sanity check for ip_len? */
1153                         flow->size = ntohs(nl->ip_len);
1154 #if ((DEBUG) & DEBUG_I)
1155                         size_total += flow->size;
1156 #endif
1157
1158                         flow->sip = nl->ip_src;
1159                         flow->dip = nl->ip_dst;
1160                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1161                         
1162                         /* It's going to be expensive calling this syscall on every flow.
1163                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1164
1165                         flow->slice_id=0;
1166
1167                         if (ulog_msg->mark > 0) {
1168                                 flow->slice_id = xid_to_slice_id(ulog_msg->mark);
1169                         }
1170
1171                         if (flow->slice_id < 1)
1172                                 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
1173
1174
1175                         if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1176                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1177                         }
1178                         flow->iif = snmp_index(ulog_msg->indev_name);
1179                         flow->oif = snmp_index(ulog_msg->outdev_name);
1180                         flow->proto = nl->ip_p;
1181                         flow->id = 0;
1182                         flow->tcp_flags = 0;
1183                         flow->pkts = 1;
1184                         flow->sizeF = 0;
1185                         flow->sizeP = 0;
1186                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1187                         if (ulog_msg->timestamp_sec) {
1188                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1189                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1190                         } else gettime(&flow->ctime);
1191                         flow->mtime = flow->ctime;
1192
1193                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1194
1195                         /*
1196                            Offset (from network layer) to transport layer header/IP data
1197                            IOW IP header size ;-)
1198
1199                            ?FIXME?
1200                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1201                            */
1202                         off_tl = nl->ip_hl << 2;
1203                         tl = (void *) nl + off_tl;
1204
1205                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1206                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1207                         psize -= off_tl;
1208                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1209                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1210
1211                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1212                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1213 #if ((DEBUG) & DEBUG_C)
1214                                 strcat(logbuf, " F");
1215 #endif
1216 #if ((DEBUG) & DEBUG_I)
1217                                 pkts_total_fragmented++;
1218 #endif
1219                                 flow->flags |= FLOW_FRAG;
1220                                 flow->id = nl->ip_id;
1221
1222                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1223                                         /* Packet whith IP_MF contains information about whole datagram size */
1224                                         flow->flags |= FLOW_LASTFRAG;
1225                                         /* size = frag_offset*8 + data_size */
1226                                         flow->sizeP = off_frag + flow->sizeF;
1227                                 }
1228                         }
1229
1230 #if ((DEBUG) & DEBUG_C)
1231                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1232                         strcat(logbuf, buf);
1233                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1234                         strcat(logbuf, buf);
1235 #endif
1236
1237                         /*
1238                            Fortunately most interesting transport layer information fit
1239                            into first 8 bytes of IP data field (minimal nonzero size).
1240                            Thus we don't need actual packet reassembling to build whole
1241                            transport layer data. We only check the fragment offset for
1242                            zero value to find packet with this information.
1243                            */
1244                         if (!off_frag && psize >= 8) {
1245                                 switch (flow->proto) {
1246                                         case IPPROTO_TCP:
1247                                         case IPPROTO_UDP:
1248                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1249                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1250                                                 goto tl_known;
1251
1252 #ifdef ICMP_TRICK
1253                                         case IPPROTO_ICMP:
1254                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1255                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1256                                                 goto tl_known;
1257 #endif
1258 #ifdef ICMP_TRICK_CISCO
1259                                         case IPPROTO_ICMP:
1260                                                 flow->dp = *((int32_t *) tl);
1261                                                 goto tl_known;
1262 #endif
1263
1264                                         default:
1265                                                 /* Unknown transport layer */
1266 #if ((DEBUG) & DEBUG_C)
1267                                                 strcat(logbuf, " U");
1268 #endif
1269                                                 flow->sp = 0;
1270                                                 flow->dp = 0;
1271                                                 break;
1272
1273 tl_known:
1274 #if ((DEBUG) & DEBUG_C)
1275                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1276                                                 strcat(logbuf, buf);
1277 #endif
1278                                                 flow->flags |= FLOW_TL;
1279                                 }
1280                         }
1281
1282                         /* Check for tcp flags presence (including CWR and ECE). */
1283                         if (flow->proto == IPPROTO_TCP
1284                                         && off_frag < 16
1285                                         && psize >= 16 - off_frag) {
1286                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1287 #if ((DEBUG) & DEBUG_C)
1288                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1289                                 strcat(logbuf, buf);
1290 #endif
1291                         }
1292
1293 #if ((DEBUG) & DEBUG_C)
1294                         sprintf(buf, " => %x", (unsigned) flow);
1295                         strcat(logbuf, buf);
1296                         my_log(LOG_DEBUG, "%s", logbuf);
1297 #endif
1298
1299 #if ((DEBUG) & DEBUG_I)
1300                         pkts_pending++;
1301                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1302                         if (pending_queue_trace < pending_queue_trace_candidate)
1303                                 pending_queue_trace = pending_queue_trace_candidate;
1304 #endif
1305
1306                         /* Flow complete - inform unpending_thread() about it */
1307                         pending_head->flags |= FLOW_PENDING;
1308                         pending_head = pending_head->next;
1309 done:
1310                         pthread_cond_signal(&unpending_cond);
1311                 }
1312         }
1313         return 0;
1314 }
1315
1316 /* Copied out of CoDemux */
1317
1318 static int init_daemon() {
1319         pid_t pid;
1320         FILE *pidfile;
1321
1322         pidfile = fopen(PIDFILE, "w");
1323         if (pidfile == NULL) {
1324                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1325         }
1326
1327         if ((pid = fork()) < 0) {
1328                 fclose(pidfile);
1329                 my_log(LOG_ERR, "Could not fork!\n");
1330                 return(-1);
1331         }
1332         else if (pid != 0) {
1333                 /* i'm the parent, writing down the child pid  */
1334                 fprintf(pidfile, "%u\n", pid);
1335                 fclose(pidfile);
1336                 exit(0);
1337         }
1338
1339         /* close the pid file */
1340         fclose(pidfile);
1341
1342         /* routines for any daemon process
1343            1. create a new session 
1344            2. change directory to the root
1345            3. change the file creation permission 
1346            */
1347         setsid();
1348         chdir("/var/local/fprobe");
1349         umask(0);
1350
1351         return(0);
1352 }
1353
1354 int main(int argc, char **argv)
1355 {
1356         char errpbuf[512];
1357         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1358         int c, i, write_fd, memory_limit = 0;
1359         struct addrinfo hints, *res;
1360         struct sockaddr_in saddr;
1361         pthread_attr_t tattr;
1362         struct sigaction sigact;
1363         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1364         struct timeval timeout;
1365
1366         sched_min = sched_get_priority_min(SCHED);
1367         sched_max = sched_get_priority_max(SCHED);
1368
1369         memset(&saddr, 0 , sizeof(saddr));
1370         memset(&hints, 0 , sizeof(hints));
1371         hints.ai_flags = AI_PASSIVE;
1372         hints.ai_family = AF_INET;
1373         hints.ai_socktype = SOCK_DGRAM;
1374
1375         /* Process command line options */
1376
1377         opterr = 0;
1378         while ((c = my_getopt(argc, argv, parms)) != -1) {
1379                 switch (c) {
1380                         case '?':
1381                                 usage();
1382
1383                         case 'h':
1384                                 usage();
1385                 }
1386         }
1387
1388         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1389         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1390         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1391         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1392         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1393         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1394         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1395         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1396         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1397         if (parms[nflag].count) {
1398                 switch (atoi(parms[nflag].arg)) {
1399                         case 1:
1400                                 netflow = &NetFlow1;
1401                                 break;
1402
1403                         case 5:
1404                                 break;
1405
1406                         case 7:
1407                                 netflow = &NetFlow7;
1408                                 break;
1409
1410                         default:
1411                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1412                                 exit(1);
1413                 }
1414         }
1415         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1416         if (parms[lflag].count) {
1417                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1418                         *log_suffix++ = 0;
1419                         if (*log_suffix) {
1420                                 sprintf(errpbuf, "[%s]", log_suffix);
1421                                 strcat(ident, errpbuf);
1422                         }
1423                 }
1424                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1425                 if (log_suffix) *--log_suffix = ':';
1426         }
1427         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1428 err_malloc:
1429                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1430                 exit(1);
1431         }
1432         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1433         if (parms[qflag].count) {
1434                 pending_queue_length = atoi(parms[qflag].arg);
1435                 if (pending_queue_length < 1) {
1436                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1437                         exit(1);
1438                 }
1439         }
1440         if (parms[rflag].count) {
1441                 schedp.sched_priority = atoi(parms[rflag].arg);
1442                 if (schedp.sched_priority
1443                                 && (schedp.sched_priority < sched_min
1444                                         || schedp.sched_priority > sched_max)) {
1445                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1446                         exit(1);
1447                 }
1448         }
1449         if (parms[Bflag].count) {
1450                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1451         }
1452         if (parms[bflag].count) {
1453                 bulk_quantity = atoi(parms[bflag].arg);
1454                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1455                         fprintf(stderr, "Illegal %s\n", "bulk size");
1456                         exit(1);
1457                 }
1458         }
1459         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1460         if (parms[Xflag].count) {
1461                 for(i = 0; parms[Xflag].arg[i]; i++)
1462                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1463                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1464                         goto err_malloc;
1465                 rule = strtok(parms[Xflag].arg, ":");
1466                 for (i = 0; rule; i++) {
1467                         snmp_rules[i].len = strlen(rule);
1468                         if (snmp_rules[i].len > IFNAMSIZ) {
1469                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1470                                 exit(1);
1471                         }
1472                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1473                         if (!*(rule - 1)) *(rule - 1) = ',';
1474                         rule = strtok(NULL, ",");
1475                         if (!rule) {
1476                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1477                                 exit(1);
1478                         }
1479                         snmp_rules[i].base = atoi(rule);
1480                         *(rule - 1) = ':';
1481                         rule = strtok(NULL, ":");
1482                 }
1483                 nsnmp_rules = i;
1484         }
1485         if (parms[tflag].count)
1486                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1487         if (parms[aflag].count) {
1488                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1489 bad_lhost:
1490                         fprintf(stderr, "Illegal %s\n", "source address");
1491                         exit(1);
1492                 } else {
1493                         saddr = *((struct sockaddr_in *) res->ai_addr);
1494                         freeaddrinfo(res);
1495                 }
1496         }
1497         if (parms[uflag].count) 
1498                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1499                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1500                         exit(1);
1501                 }
1502
1503
1504         /* Process collectors parameters. Brrrr... :-[ */
1505
1506         npeers = argc - optind;
1507         if (npeers > 1) {
1508                 /* Send to remote Netflow collector */
1509                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1510                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1511                         dhost = argv[i];
1512                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1513                         *dport++ = 0;
1514                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1515                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1516                                 exit(1);
1517                         }
1518                         peers[npeers].write_fd = write_fd;
1519                         peers[npeers].type = PEER_MIRROR;
1520                         peers[npeers].laddr = saddr;
1521                         peers[npeers].seq = 0;
1522                         if ((lhost = strchr(dport, '/'))) {
1523                                 *lhost++ = 0;
1524                                 if ((type = strchr(lhost, '/'))) {
1525                                         *type++ = 0;
1526                                         switch (*type) {
1527                                                 case 0:
1528                                                 case 'm':
1529                                                         break;
1530
1531                                                 case 'r':
1532                                                         peers[npeers].type = PEER_ROTATE;
1533                                                         npeers_rot++;
1534                                                         break;
1535
1536                                                 default:
1537                                                         goto bad_collector;
1538                                         }
1539                                 }
1540                                 if (*lhost) {
1541                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1542                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1543                                         freeaddrinfo(res);
1544                                 }
1545                         }
1546                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1547                                                 sizeof(struct sockaddr_in))) {
1548                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1549                                 exit(1);
1550                         }
1551                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1552 bad_collector:
1553                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1554                                 exit(1);
1555                         }
1556                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1557                         freeaddrinfo(res);
1558                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1559                                                 sizeof(struct sockaddr_in))) {
1560                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1561                                 exit(1);
1562                         }
1563
1564                         /* Restore command line */
1565                         if (type) *--type = '/';
1566                         if (lhost) *--lhost = '/';
1567                         *--dport = ':';
1568                 }
1569         }
1570         else if (parms[fflag].count) {
1571                 // log into a file
1572                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1573                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1574                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1575
1576                 peers[npeers].write_fd = START_DATA_FD;
1577                 peers[npeers].type = PEER_FILE;
1578                 peers[npeers].seq = 0;
1579
1580                 read_cur_epoch();
1581                 npeers++;
1582         }
1583         else 
1584                 usage();
1585
1586
1587         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1588         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1589         if (!ulog_handle) {
1590                 fprintf(stderr, "libipulog initialization error: %s",
1591                                 ipulog_strerror(ipulog_errno));
1592                 exit(1);
1593         }
1594         if (sockbufsize)
1595                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1596                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1597                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1598
1599         /* Daemonize (if log destination stdout-free) */
1600
1601         my_log_open(ident, verbosity, log_dest);
1602
1603         init_daemon();
1604
1605         if (!(log_dest & 2)) {
1606                 /* Crash-proofing - Sapan*/
1607                 while (1) {
1608                         int pid=fork();
1609                         if (pid==-1) {
1610                                 fprintf(stderr, "fork(): %s", strerror(errno));
1611                                 exit(1);
1612                         }
1613                         else if (pid==0) {
1614                                 setsid();
1615                                 freopen("/dev/null", "r", stdin);
1616                                 freopen("/dev/null", "w", stdout);
1617                                 freopen("/dev/null", "w", stderr);
1618                                 break;
1619                         }
1620                         else {
1621                                 while (wait3(NULL,0,NULL) < 1);
1622                         }
1623                 }
1624         } else {
1625                 setvbuf(stdout, (char *)0, _IONBF, 0);
1626                 setvbuf(stderr, (char *)0, _IONBF, 0);
1627         }
1628
1629         pid = getpid();
1630         sprintf(errpbuf, "[%ld]", (long) pid);
1631         strcat(ident, errpbuf);
1632
1633         /* Initialization */
1634
1635     init_slice_id_hash();
1636         hash_init(); /* Actually for crc16 only */
1637         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1638         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1639
1640 #ifdef UPTIME_TRICK
1641         /* Hope 12 days is enough :-/ */
1642         start_time_offset = 1 << 20;
1643
1644         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1645 #endif
1646         gettime(&start_time);
1647
1648         /*
1649            Build static pending queue as circular buffer.
1650            */
1651         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1652         pending_tail = pending_head;
1653         for (i = pending_queue_length - 1; i--;) {
1654                 if (!(pending_tail->next = mem_alloc())) {
1655 err_mem_alloc:
1656                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1657                         exit(1);
1658                 }
1659                 pending_tail = pending_tail->next;
1660         }
1661         pending_tail->next = pending_head;
1662         pending_tail = pending_head;
1663
1664         sigemptyset(&sig_mask);
1665         sigact.sa_handler = &sighandler;
1666         sigact.sa_mask = sig_mask;
1667         sigact.sa_flags = 0;
1668         sigaddset(&sig_mask, SIGTERM);
1669         sigaction(SIGTERM, &sigact, 0);
1670 #if ((DEBUG) & DEBUG_I)
1671         sigaddset(&sig_mask, SIGUSR1);
1672         sigaction(SIGUSR1, &sigact, 0);
1673 #endif
1674         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1675                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1676                 exit(1);
1677         }
1678
1679         my_log(LOG_INFO, "Starting %s...", VERSION);
1680
1681         if (parms[cflag].count) {
1682                 if (chdir(parms[cflag].arg) || chroot(".")) {
1683                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1684                         exit(1);
1685                 }
1686         }
1687
1688         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1689         pthread_attr_init(&tattr);
1690         for (i = 0; i < THREADS - 1; i++) {
1691                 if (schedp.sched_priority > 0) {
1692                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1693                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1694                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1695                                 exit(1);
1696                         }
1697                 }
1698                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1699                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1700                         exit(1);
1701                 }
1702                 pthread_detach(thid);
1703                 schedp.sched_priority++;
1704         }
1705
1706         if (pw) {
1707                 if (setgroups(0, NULL)) {
1708                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1709                         exit(1);
1710                 }
1711                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1712                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1713                         exit(1);
1714                 }
1715                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1716                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1717                         exit(1);
1718                 }
1719         }
1720
1721         if (!(pidfile = fopen(pidfilepath, "w")))
1722                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1723         else {
1724                 fprintf(pidfile, "%ld\n", (long) pid);
1725                 fclose(pidfile);
1726         }
1727
1728         my_log(LOG_INFO, "pid: %d", pid);
1729         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1730                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1731                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1732                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1733                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1734                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1735                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1736         for (i = 0; i < nsnmp_rules; i++) {
1737                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1738                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1739         }
1740         for (i = 0; i < npeers; i++) {
1741                 switch (peers[i].type) {
1742                         case PEER_MIRROR:
1743                                 c = 'm';
1744                                 break;
1745                         case PEER_ROTATE:
1746                                 c = 'r';
1747                                 break;
1748                 }
1749                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1750                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1751                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1752         }
1753
1754         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1755
1756         timeout.tv_usec = 0;
1757         while (!killed
1758                         || (total_elements - free_elements - pending_queue_length)
1759                         || emit_count
1760                         || pending_tail->flags) {
1761
1762                 if (!sigs) {
1763                         timeout.tv_sec = scan_interval;
1764                         select(0, 0, 0, 0, &timeout);
1765                 }
1766
1767                 if (sigs & SIGTERM_MASK && !killed) {
1768                         sigs &= ~SIGTERM_MASK;
1769                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1770                         scan_interval = 1;
1771                         frag_lifetime = -1;
1772                         active_lifetime = -1;
1773                         inactive_lifetime = -1;
1774                         emit_timeout = 1;
1775                         unpending_timeout = 1;
1776                         killed = 1;
1777                         pthread_cond_signal(&scan_cond);
1778                         pthread_cond_signal(&unpending_cond);
1779                 }
1780
1781 #if ((DEBUG) & DEBUG_I)
1782                 if (sigs & SIGUSR1_MASK) {
1783                         sigs &= ~SIGUSR1_MASK;
1784                         info_debug();
1785                 }
1786 #endif
1787         }
1788         remove(pidfilepath);
1789 #if ((DEBUG) & DEBUG_I)
1790         info_debug();
1791 #endif
1792         my_log(LOG_INFO, "Done.");
1793 #ifdef WALL
1794         return 0;
1795 #endif
1796 }