0dc3071bb9eeb3a2ff6dd1377b3b6f90e5b360ab
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, slice_id support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 #include "vserver.h"
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220 static char cur_output_file[MAX_PATH_LEN];
221
222 static struct Flow *flows[1 << HASH_BITS];
223 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
224
225 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
227
228 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *pending_head, *pending_tail;
231 static struct Flow *scan_frag_dreg;
232
233 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
235 static struct Flow *flows_emit;
236
237 static char ident[256] = "fprobe-ulog";
238 static FILE *pidfile;
239 static char *pidfilepath;
240 static pid_t pid;
241 static int killed;
242 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
243 static struct ipulog_handle *ulog_handle;
244 static uint32_t ulog_gmask = 1;
245 static char *cap_buf;
246 static int nsnmp_rules;
247 static struct snmp_rule *snmp_rules;
248 static struct passwd *pw = 0;
249
250 void usage()
251 {
252         fprintf(stdout,
253                         "fprobe-ulog: a NetFlow probe. Version %s\n"
254                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
255                         "\n"
256                         "-h\t\tDisplay this help\n"
257                         "-U <mask>\tULOG group bitwise mask [1]\n"
258                         "-s <seconds>\tHow often scan for expired flows [5]\n"
259                         "-g <seconds>\tFragmented flow lifetime [30]\n"
260                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
261                         "-f <filename>\tLog flow data in a file\n"
262                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
263                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
264                         "-a <address>\tUse <address> as source for NetFlow flow\n"
265                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
266                         "-M\t\tUse netfilter mark value as ToS flag\n"
267                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
268                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
269                         "-q <flows>\tPending queue length [100]\n"
270                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
271                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
272                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
273                         "-c <directory>\tDirectory to chroot to\n"
274                         "-u <user>\tUser to run as\n"
275                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
276                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
277                         "-y <remote:port>\tAddress of the NetFlow collector\n"
278                         "-f <writable file>\tFile to write data into\n"
279                         "-T <n>\tRotate log file every n epochs\n"
280                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
281                         "-E <[1..60]>\tSize of an epoch in minutes\n"
282                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
283                         ,
284                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
285         exit(0);
286 }
287
288 #if ((DEBUG) & DEBUG_I)
289 void info_debug()
290 {
291         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
292                         pkts_total, pkts_total_fragmented, size_total,
293                         pkts_pending - pkts_pending_done, pending_queue_trace);
294         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
295                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
296         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
297                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
298         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
299                         total_elements, free_elements, total_memory);
300 }
301 #endif
302
303 void sighandler(int sig)
304 {
305         switch (sig) {
306                 case SIGTERM:
307                         sigs |= SIGTERM_MASK;
308                         break;
309 #if ((DEBUG) & DEBUG_I)
310                 case SIGUSR1:
311                         sigs |= SIGUSR1_MASK;
312                         break;
313 #endif
314         }
315 }
316
317 void gettime(struct Time *now)
318 {
319         struct timeval t;
320
321         gettimeofday(&t, 0);
322         now->sec = t.tv_sec;
323         now->usec = t.tv_usec;
324 }
325
326
327 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
328 {
329         return (t1->sec - t2->sec)/60;
330 }
331
332 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
333 {
334         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
335 }
336
337 /* Uptime in miliseconds */
338 uint32_t getuptime(struct Time *t)
339 {
340         /* Maximum uptime is about 49/2 days */
341         return cmpmtime(t, &start_time);
342 }
343
344 /* Uptime in minutes */
345 uint32_t getuptime_minutes(struct Time *t)
346 {
347         /* Maximum uptime is about 49/2 days */
348         return cmpMtime(t, &start_time);
349 }
350
351 hash_t hash_flow(struct Flow *flow)
352 {
353         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
354         else return hash(flow, sizeof(struct Flow_TL));
355 }
356
357 uint16_t snmp_index(char *name) {
358         uint32_t i;
359
360         if (!*name) return 0;
361
362         for (i = 0; (int) i < nsnmp_rules; i++) {
363                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
364                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
365         }
366
367         if ((i = if_nametoindex(name))) return i;
368
369         return -1;
370 }
371
372 inline void copy_flow(struct Flow *src, struct Flow *dst)
373 {
374         dst->iif = src->iif;
375         dst->oif = src->oif;
376         dst->sip = src->sip;
377         dst->dip = src->dip;
378         dst->tos = src->tos;
379         dst->slice_id = src->slice_id;
380         dst->proto = src->proto;
381         dst->tcp_flags = src->tcp_flags;
382         dst->id = src->id;
383         dst->sp = src->sp;
384         dst->dp = src->dp;
385         dst->pkts = src->pkts;
386         dst->size = src->size;
387         dst->sizeF = src->sizeF;
388         dst->sizeP = src->sizeP;
389         dst->ctime = src->ctime;
390         dst->mtime = src->mtime;
391         dst->flags = src->flags;
392 }
393
394 void read_cur_epoch() {
395         int fd;
396         /* Reset to -1 in case the read fails */
397         cur_epoch=-1;
398         fd = open(LAST_EPOCH_FILE, O_RDONLY);
399         if (fd != -1) {
400                 char snum[MAX_EPOCH_SIZE];
401                 ssize_t len;
402                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
403                 if (len != -1) {
404                         snum[len]='\0';
405                         sscanf(snum,"%d",&cur_epoch);
406                         cur_epoch++; /* Let's not stone the last epoch */
407                         close(fd);
408                 }
409         }
410         return;
411 }
412
413
414 /* Dumps the current epoch in a file to cope with
415  * reboots and killings of fprobe */
416
417 void update_cur_epoch_file(int n) {
418         int fd, len;
419         char snum[MAX_EPOCH_SIZE];
420         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
421         fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
422         if (fd == -1) {
423                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
424                 return;
425         }
426         write(fd, snum, len);
427         close(fd);
428 }
429
430 /* Get the file descriptor corresponding to the current file.
431  * The kludgy implementation is to abstract away the 'current
432  * file descriptor', which may also be a socket. 
433  */
434
435 unsigned get_data_file_fd(char *fname, int cur_fd) {
436         struct Time now;
437         unsigned cur_uptime;
438
439         struct statfs statfs;
440         int ret_fd;
441
442         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
443          * doesn't solve the problem */
444         gettime(&now);
445         cur_uptime = getuptime_minutes(&now);
446
447         if (cur_fd != START_DATA_FD) {
448                 if (fstatfs(cur_fd, &statfs) == -1) {
449                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
450                 }
451                 else {
452                         if (min_free && (statfs.f_bavail < min_free)
453                                         && (cur_epoch==last_peak))
454                         {
455                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
456                                 cur_epoch = -1;
457                         }
458                         /*
459                            else 
460                            assume that we can reclaim space by overwriting our own files
461                            and that the difference in size will not fill the disk - sapan
462                            */
463                 }
464         }
465
466         /* If epoch length has been exceeded, 
467          * or we're starting up 
468          * or we're going back to the first epoch */
469         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0) {
475                         close(cur_fd);
476             /* Compress the finished file */
477             char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; 
478             snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
479             system(gzip_cmd);
480         }
481                 snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
482                 if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
483                         my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
484                         exit(1);
485                 }
486                 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
487                         my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
488                 }
489                 update_cur_epoch_file(cur_epoch);
490                 ret_fd = write_fd;
491         }
492         else {
493                 ret_fd = cur_fd;
494         }
495         return(ret_fd);
496 }
497
498 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
499 {
500         struct Flow **flowpp;
501
502 #ifdef WALL
503         flowpp = 0;
504 #endif
505
506         if (prev) flowpp = *prev;
507
508         while (where) {
509                 if (where->sip.s_addr == what->sip.s_addr
510                                 && where->dip.s_addr == what->dip.s_addr
511                                 && where->proto == what->proto) {
512                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
513                                 case 0:
514                                         /* Both unfragmented */
515                                         if ((what->sp == where->sp)
516                                                         && (what->dp == where->dp)) goto done;
517                                         break;
518                                 case 2:
519                                         /* Both fragmented */
520                                         if (where->id == what->id) goto done;
521                                         break;
522                         }
523                 }
524                 flowpp = &where->next;
525                 where = where->next;
526         }
527 done:
528         if (prev) *prev = flowpp;
529         return where;
530 }
531
532         int put_into(struct Flow *flow, int flag
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
534                         , char *logbuf
535 #endif
536                     )
537 {
538         int ret = 0;
539         hash_t h;
540         struct Flow *flown, **flowpp;
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542         char buf[64];
543 #endif
544
545         h = hash_flow(flow);
546 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
547         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
548         strcat(logbuf, buf);
549 #endif
550         pthread_mutex_lock(&flows_mutex[h]);
551         flowpp = &flows[h];
552         if (!(flown = find(flows[h], flow, &flowpp))) {
553                 /* No suitable flow found - add */
554                 if (flag == COPY_INTO) {
555                         if ((flown = mem_alloc())) {
556                                 copy_flow(flow, flown);
557                                 flow = flown;
558                         } else {
559 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
560                                 my_log(LOG_ERR, "%s %s. %s",
561                                                 "mem_alloc():", strerror(errno), "packet lost");
562 #endif
563                                 return -1;
564                         }
565                 }
566                 flow->next = flows[h];
567                 flows[h] = flow;
568 #if ((DEBUG) & DEBUG_I)
569                 flows_total++;
570                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
571 #endif
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
573                 if (flown) {
574                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
575                         strcat(logbuf, buf);
576                 }
577 #endif
578         } else {
579                 /* Found suitable flow - update */
580 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
581                 sprintf(buf, " +> %x", (unsigned) flown);
582                 strcat(logbuf, buf);
583 #endif
584                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
585                         flown->mtime = flow->mtime;
586                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
587                         flown->ctime = flow->ctime;
588                 flown->tcp_flags |= flow->tcp_flags;
589                 flown->size += flow->size;
590                 flown->pkts += flow->pkts;
591
592                 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
593                  * if a better value comes along. A good example of this is that by the time CoDemux sets the
594                  * peercred of a flow, it has already been accounted for here and attributed to root. */
595
596                 if (flown->slice_id<1)
597                                 flown->slice_id = flow->slice_id;
598
599
600                 if (flow->flags & FLOW_FRAG) {
601                         /* Fragmented flow require some additional work */
602                         if (flow->flags & FLOW_TL) {
603                                 /*
604                                    ?FIXME?
605                                    Several packets with FLOW_TL (attack)
606                                    */
607                                 flown->sp = flow->sp;
608                                 flown->dp = flow->dp;
609                         }
610                         if (flow->flags & FLOW_LASTFRAG) {
611                                 /*
612                                    ?FIXME?
613                                    Several packets with FLOW_LASTFRAG (attack)
614                                    */
615                                 flown->sizeP = flow->sizeP;
616                         }
617                         flown->flags |= flow->flags;
618                         flown->sizeF += flow->sizeF;
619                         if ((flown->flags & FLOW_LASTFRAG)
620                                         && (flown->sizeF >= flown->sizeP)) {
621                                 /* All fragments received - flow reassembled */
622                                 *flowpp = flown->next;
623                                 pthread_mutex_unlock(&flows_mutex[h]);
624 #if ((DEBUG) & DEBUG_I)
625                                 flows_total--;
626                                 flows_fragmented--;
627 #endif
628                                 flown->id = 0;
629                                 flown->flags &= ~FLOW_FRAG;
630 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
631                                 strcat(logbuf," R");
632 #endif
633                                 ret = put_into(flown, MOVE_INTO
634 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
635                                                 , logbuf
636 #endif
637                                               );
638                         }
639                 }
640                 if (flag == MOVE_INTO) mem_free(flow);
641         }
642         pthread_mutex_unlock(&flows_mutex[h]);
643         return ret;
644 }
645
646 int onlyonce=0;
647
648 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
649 {
650         int i;
651
652         for (i = 0; i < fields; i++) {
653 #if ((DEBUG) & DEBUG_F)
654                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
655 #endif
656                 switch (format[i]) {
657                         case NETFLOW_IPV4_SRC_ADDR:
658                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
659                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
660                                 break;
661
662                         case NETFLOW_IPV4_DST_ADDR:
663                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
664                                 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
665                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
666                                 }
667                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
668                                 break;
669
670                         case NETFLOW_INPUT_SNMP:
671                                 *((uint16_t *) p) = htons(flow->iif);
672                                 p += NETFLOW_INPUT_SNMP_SIZE;
673                                 break;
674
675                         case NETFLOW_OUTPUT_SNMP:
676                                 *((uint16_t *) p) = htons(flow->oif);
677                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
678                                 break;
679
680                         case NETFLOW_PKTS_32:
681                                 *((uint32_t *) p) = htonl(flow->pkts);
682                                 p += NETFLOW_PKTS_32_SIZE;
683                                 break;
684
685                         case NETFLOW_BYTES_32:
686                                 *((uint32_t *) p) = htonl(flow->size);
687                                 p += NETFLOW_BYTES_32_SIZE;
688                                 break;
689
690                         case NETFLOW_FIRST_SWITCHED:
691                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
692                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
693                                 break;
694
695                         case NETFLOW_LAST_SWITCHED:
696                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
697                                 p += NETFLOW_LAST_SWITCHED_SIZE;
698                                 break;
699
700                         case NETFLOW_L4_SRC_PORT:
701                                 *((uint16_t *) p) = flow->sp;
702                                 p += NETFLOW_L4_SRC_PORT_SIZE;
703                                 break;
704
705                         case NETFLOW_L4_DST_PORT:
706                                 *((uint16_t *) p) = flow->dp;
707                                 p += NETFLOW_L4_DST_PORT_SIZE;
708                                 break;
709
710                         case NETFLOW_PROT:
711                                 *((uint8_t *) p) = flow->proto;
712                                 p += NETFLOW_PROT_SIZE;
713                                 break;
714
715                         case NETFLOW_SRC_TOS:
716                                 *((uint8_t *) p) = flow->tos;
717                                 p += NETFLOW_SRC_TOS_SIZE;
718                                 break;
719
720                         case NETFLOW_TCP_FLAGS:
721                                 *((uint8_t *) p) = flow->tcp_flags;
722                                 p += NETFLOW_TCP_FLAGS_SIZE;
723                                 break;
724
725                         case NETFLOW_VERSION:
726                                 *((uint16_t *) p) = htons(netflow->Version);
727                                 p += NETFLOW_VERSION_SIZE;
728                                 break;
729
730                         case NETFLOW_COUNT:
731                                 *((uint16_t *) p) = htons(emit_count);
732                                 p += NETFLOW_COUNT_SIZE;
733                                 break;
734
735                         case NETFLOW_UPTIME:
736                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
737                                 p += NETFLOW_UPTIME_SIZE;
738                                 break;
739
740                         case NETFLOW_UNIX_SECS:
741                                 *((uint32_t *) p) = htonl(emit_time.sec);
742                                 p += NETFLOW_UNIX_SECS_SIZE;
743                                 break;
744
745                         case NETFLOW_UNIX_NSECS:
746                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
747                                 p += NETFLOW_UNIX_NSECS_SIZE;
748                                 break;
749
750                         case NETFLOW_FLOW_SEQUENCE:
751                                 //*((uint32_t *) p) = htonl(emit_sequence);
752                                 *((uint32_t *) p) = 0;
753                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
754                                 break;
755
756                         case NETFLOW_PAD8:
757                                 /* Unsupported (uint8_t) */
758                         case NETFLOW_ENGINE_TYPE:
759                         case NETFLOW_ENGINE_ID:
760                         case NETFLOW_FLAGS7_1:
761                         case NETFLOW_SRC_MASK:
762                         case NETFLOW_DST_MASK:
763                                 if (onlyonce) {
764                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
765                                         onlyonce=1;
766                                 }
767                                 *((uint8_t *) p) = 0;
768                                 p += NETFLOW_PAD8_SIZE;
769                                 break;
770                         case NETFLOW_SLICE_ID:
771                                 *((uint32_t *) p) = flow->slice_id;
772                                 p += NETFLOW_SLICE_ID_SIZE;
773                                 break;
774                         case NETFLOW_PAD16:
775                                 /* Unsupported (uint16_t) */
776                         case NETFLOW_SRC_AS:
777                         case NETFLOW_DST_AS:
778                         case NETFLOW_FLAGS7_2:
779                                 *((uint16_t *) p) = 0;
780                                 p += NETFLOW_PAD16_SIZE;
781                                 break;
782
783                         case NETFLOW_PAD32:
784                                 /* Unsupported (uint32_t) */
785                         case NETFLOW_IPV4_NEXT_HOP:
786                         case NETFLOW_ROUTER_SC:
787                                 *((uint32_t *) p) = 0;
788                                 p += NETFLOW_PAD32_SIZE;
789                                 break;
790
791                         default:
792                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
793                                                 format, i, format[i]);
794                                 exit(1);
795                 }
796         }
797 #if ((DEBUG) & DEBUG_F)
798         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
799 #endif
800         return p;
801 }
802
803 void setuser() {
804         /*
805            Workaround for clone()-based threads
806            Try to change EUID independently of main thread
807            */
808         if (pw) {
809                 setgroups(0, NULL);
810                 setregid(pw->pw_gid, pw->pw_gid);
811                 setreuid(pw->pw_uid, pw->pw_uid);
812         }
813 }
814
815 void *emit_thread()
816 {
817         struct Flow *flow;
818         void *p;
819         struct timeval now;
820         struct timespec timeout;
821         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
822
823         p = (void *) &emit_packet + netflow->HeaderSize;
824         timeout.tv_nsec = 0;
825
826         setuser();
827         
828     //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT);
829
830         for (;;) {
831                 pthread_mutex_lock(&emit_mutex);
832                 while (!flows_emit) {
833                         gettimeofday(&now, 0);
834                         timeout.tv_sec = now.tv_sec + emit_timeout;
835                         /* Do not wait until emit_packet will filled - it may be too long */
836                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
837                 my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
838                                 pthread_mutex_unlock(&emit_mutex);
839                                 goto sendit;
840                         }
841                 }
842                 flow = flows_emit;
843                 flows_emit = flows_emit->next;
844 #if ((DEBUG) & DEBUG_I)
845                 emit_queue--;
846 #endif          
847                 pthread_mutex_unlock(&emit_mutex);
848
849 #ifdef UPTIME_TRICK
850                 if (!emit_count) {
851                         gettime(&start_time);
852                         start_time.sec -= start_time_offset;
853                 }
854 #endif
855                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
856                 mem_free(flow);
857                 emit_count++;
858 #ifdef PF2_DEBUG
859                 printf("Emit count = %d\n", emit_count);
860                 fflush(stdout);
861 #endif
862                 if (emit_count == netflow->MaxFlows) {
863 sendit:
864                         gettime(&emit_time);
865                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
866                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
867                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
868 #ifdef STD_NETFLOW_PDU
869                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
870 #endif
871                         peer_rot_cur = 0;
872                         for (i = 0; i < npeers; i++) {
873                                 if (peers[i].type == PEER_FILE) {
874                                         if (netflow->SeqOffset)
875                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
876                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
877                                         ret = write(peers[i].write_fd, emit_packet, size);
878                                         if (ret < size) {
879
880 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
881                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
882                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
883 #endif
884 #undef MESSAGES
885                                         }
886 #if ((DEBUG) & DEBUG_E)
887                                         else {
888                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
889                                                                 emit_count, i + 1, peers[i].seq);
890                                         }
891 #endif
892                                         peers[i].seq += emit_count;
893
894                                         /* Rate limit */
895                                         if (emit_rate_bytes) {
896                                                 sent += size;
897                                                 delay = sent / emit_rate_bytes;
898                                                 if (delay) {
899                                                         sent %= emit_rate_bytes;
900                                                         timeout.tv_sec = 0;
901                                                         timeout.tv_nsec = emit_rate_delay * delay;
902                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
903                                                 }
904                                         }
905                                 }
906                                 else
907                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
908                                         else
909                                                 if (peers[i].type == PEER_ROTATE) 
910                                                         if (peer_rot_cur++ == peer_rot_work) {
911 sendreal:
912                                                                 if (netflow->SeqOffset)
913                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
914                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
915                                                                 if (ret < size) {
916 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
917                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
918                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
919 #endif
920                                                                 }
921 #if ((DEBUG) & DEBUG_E)
922                                                                 else {
923                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
924                                                                                         emit_count, i + 1, peers[i].seq);
925                                                                 }
926 #endif
927                                                                 peers[i].seq += emit_count;
928
929                                                                 /* Rate limit */
930                                                                 if (emit_rate_bytes) {
931                                                                         sent += size;
932                                                                         delay = sent / emit_rate_bytes;
933                                                                         if (delay) {
934                                                                                 sent %= emit_rate_bytes;
935                                                                                 timeout.tv_sec = 0;
936                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
937                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
938                                                                         }
939                                                                 }
940                                                         }
941                         }
942                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
943                         emit_sequence += emit_count;
944                         emit_count = 0;
945 #if ((DEBUG) & DEBUG_I)
946                         emit_pkts++;
947 #endif
948                 }
949         }
950 }       
951
952 void *unpending_thread()
953 {
954         struct timeval now;
955         struct timespec timeout;
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
957         char logbuf[256];
958 #endif
959
960         setuser();
961
962         timeout.tv_nsec = 0;
963         pthread_mutex_lock(&unpending_mutex);
964
965         for (;;) {
966                 while (!(pending_tail->flags & FLOW_PENDING)) {
967                         gettimeofday(&now, 0);
968                         timeout.tv_sec = now.tv_sec + unpending_timeout;
969                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
970                 }
971
972 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
973                 *logbuf = 0;
974 #endif
975                 if (put_into(pending_tail, COPY_INTO
976 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
977                                         , logbuf
978 #endif
979                             ) < 0) {
980 #if ((DEBUG) & DEBUG_I)
981                         pkts_lost_unpending++;
982 #endif                          
983                 }
984
985 #if ((DEBUG) & DEBUG_U)
986                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
987 #endif
988
989                 pending_tail->flags = 0;
990                 pending_tail = pending_tail->next;
991 #if ((DEBUG) & DEBUG_I)
992                 pkts_pending_done++;
993 #endif
994         }
995 }
996
997 void *scan_thread()
998 {
999 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1000         char logbuf[256];
1001 #endif
1002         int i;
1003         struct Flow *flow, **flowpp;
1004         struct Time now;
1005         struct timespec timeout;
1006
1007         setuser();
1008
1009         timeout.tv_nsec = 0;
1010         pthread_mutex_lock(&scan_mutex);
1011
1012         for (;;) {
1013                 gettime(&now);
1014                 timeout.tv_sec = now.sec + scan_interval;
1015                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1016
1017                 gettime(&now);
1018 #if ((DEBUG) & DEBUG_S)
1019                 my_log(LOG_DEBUG, "S: %d", now.sec);
1020 #endif
1021                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1022                         pthread_mutex_lock(&flows_mutex[i]);
1023                         flow = flows[i];
1024                         flowpp = &flows[i];
1025                         while (flow) {
1026                                 if (flow->flags & FLOW_FRAG) {
1027                                         /* Process fragmented flow */
1028                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1029                                                 /* Fragmented flow expired - put it into special chain */
1030 #if ((DEBUG) & DEBUG_I)
1031                                                 flows_fragmented--;
1032                                                 flows_total--;
1033 #endif
1034                                                 *flowpp = flow->next;
1035                                                 flow->id = 0;
1036                                                 flow->flags &= ~FLOW_FRAG;
1037                                                 flow->next = scan_frag_dreg;
1038                                                 scan_frag_dreg = flow;
1039                                                 flow = *flowpp;
1040                                                 continue;
1041                                         }
1042                                 } else {
1043                                         /* Flow is not frgamented */
1044                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1045                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1046                                                 /* Flow expired */
1047 #if ((DEBUG) & DEBUG_S)
1048                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1049 #endif
1050 #if ((DEBUG) & DEBUG_I)
1051                                                 flows_total--;
1052 #endif
1053                                                 *flowpp = flow->next;
1054                                                 pthread_mutex_lock(&emit_mutex);
1055                                                 flow->next = flows_emit;
1056                                                 flows_emit = flow;
1057 #if ((DEBUG) & DEBUG_I)
1058                                                 emit_queue++;
1059 #endif                          
1060                                                 pthread_mutex_unlock(&emit_mutex);
1061                                                 flow = *flowpp;
1062                                                 continue;
1063                                         }
1064                                 }
1065                                 flowpp = &flow->next;
1066                                 flow = flow->next;
1067                         } /* chain loop */
1068                         pthread_mutex_unlock(&flows_mutex[i]);
1069                 } /* hash loop */
1070                 if (flows_emit) pthread_cond_signal(&emit_cond);
1071
1072                 while (scan_frag_dreg) {
1073                         flow = scan_frag_dreg;
1074                         scan_frag_dreg = flow->next;
1075 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1076                         *logbuf = 0;
1077 #endif
1078                         put_into(flow, MOVE_INTO
1079 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1080                                         , logbuf
1081 #endif
1082                                 );
1083 #if ((DEBUG) & DEBUG_S)
1084                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1085 #endif
1086                 }
1087         }
1088 }
1089
1090 void *cap_thread()
1091 {
1092         struct ulog_packet_msg *ulog_msg;
1093         struct ip *nl;
1094         void *tl;
1095         struct Flow *flow;
1096         int len, off_frag, psize;
1097 #if ((DEBUG) & DEBUG_C)
1098         char buf[64];
1099         char logbuf[256];
1100 #endif
1101         int challenge;
1102
1103         setuser();
1104
1105         while (!killed) {
1106                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1107                 if (len <= 0) {
1108                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1109                         continue;
1110                 }
1111                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1112
1113 #if ((DEBUG) & DEBUG_C)
1114                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1115 #endif
1116
1117                         nl = (void *) &ulog_msg->payload;
1118                         psize = ulog_msg->data_len;
1119
1120                         /* Sanity check */
1121                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1122 #if ((DEBUG) & DEBUG_C)
1123                                 strcat(logbuf, " U");
1124                                 my_log(LOG_DEBUG, "%s", logbuf);
1125 #endif
1126 #if ((DEBUG) & DEBUG_I)
1127                                 pkts_ignored++;
1128 #endif
1129                                 continue;
1130                         }
1131
1132                         if (pending_head->flags) {
1133 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1134                                 my_log(LOG_ERR,
1135 # if ((DEBUG) & DEBUG_C)
1136                                                 "%s %s %s", logbuf,
1137 # else
1138                                                 "%s %s",
1139 # endif
1140                                                 "pending queue full:", "packet lost");
1141 #endif
1142 #if ((DEBUG) & DEBUG_I)
1143                                 pkts_lost_capture++;
1144 #endif
1145                                 goto done;
1146                         }
1147
1148 #if ((DEBUG) & DEBUG_I)
1149                         pkts_total++;
1150 #endif
1151
1152                         flow = pending_head;
1153
1154                         /* ?FIXME? Add sanity check for ip_len? */
1155                         flow->size = ntohs(nl->ip_len);
1156 #if ((DEBUG) & DEBUG_I)
1157                         size_total += flow->size;
1158 #endif
1159
1160                         flow->sip = nl->ip_src;
1161                         flow->dip = nl->ip_dst;
1162                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1163                         
1164                         /* It's going to be expensive calling this syscall on every flow.
1165                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1166
1167                         flow->slice_id=0;
1168
1169                         if (ulog_msg->mark > 0) {
1170                                 flow->slice_id = xid_to_slice_id(ulog_msg->mark);
1171                         }
1172
1173                         if (flow->slice_id < 1)
1174                                 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
1175
1176
1177                         if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1178                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1179                         }
1180                         flow->iif = snmp_index(ulog_msg->indev_name);
1181                         flow->oif = snmp_index(ulog_msg->outdev_name);
1182                         flow->proto = nl->ip_p;
1183                         flow->id = 0;
1184                         flow->tcp_flags = 0;
1185                         flow->pkts = 1;
1186                         flow->sizeF = 0;
1187                         flow->sizeP = 0;
1188                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1189                         if (ulog_msg->timestamp_sec) {
1190                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1191                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1192                         } else gettime(&flow->ctime);
1193                         flow->mtime = flow->ctime;
1194
1195                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1196
1197                         /*
1198                            Offset (from network layer) to transport layer header/IP data
1199                            IOW IP header size ;-)
1200
1201                            ?FIXME?
1202                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1203                            */
1204                         off_tl = nl->ip_hl << 2;
1205                         tl = (void *) nl + off_tl;
1206
1207                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1208                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1209                         psize -= off_tl;
1210                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1211                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1212
1213                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1214                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1215 #if ((DEBUG) & DEBUG_C)
1216                                 strcat(logbuf, " F");
1217 #endif
1218 #if ((DEBUG) & DEBUG_I)
1219                                 pkts_total_fragmented++;
1220 #endif
1221                                 flow->flags |= FLOW_FRAG;
1222                                 flow->id = nl->ip_id;
1223
1224                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1225                                         /* Packet whith IP_MF contains information about whole datagram size */
1226                                         flow->flags |= FLOW_LASTFRAG;
1227                                         /* size = frag_offset*8 + data_size */
1228                                         flow->sizeP = off_frag + flow->sizeF;
1229                                 }
1230                         }
1231
1232 #if ((DEBUG) & DEBUG_C)
1233                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1234                         strcat(logbuf, buf);
1235                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1236                         strcat(logbuf, buf);
1237 #endif
1238
1239                         /*
1240                            Fortunately most interesting transport layer information fit
1241                            into first 8 bytes of IP data field (minimal nonzero size).
1242                            Thus we don't need actual packet reassembling to build whole
1243                            transport layer data. We only check the fragment offset for
1244                            zero value to find packet with this information.
1245                            */
1246                         if (!off_frag && psize >= 8) {
1247                                 switch (flow->proto) {
1248                                         case IPPROTO_TCP:
1249                                         case IPPROTO_UDP:
1250                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1251                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1252                                                 goto tl_known;
1253
1254 #ifdef ICMP_TRICK
1255                                         case IPPROTO_ICMP:
1256                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1257                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1258                                                 goto tl_known;
1259 #endif
1260 #ifdef ICMP_TRICK_CISCO
1261                                         case IPPROTO_ICMP:
1262                                                 flow->dp = *((int32_t *) tl);
1263                                                 goto tl_known;
1264 #endif
1265
1266                                         default:
1267                                                 /* Unknown transport layer */
1268 #if ((DEBUG) & DEBUG_C)
1269                                                 strcat(logbuf, " U");
1270 #endif
1271                                                 flow->sp = 0;
1272                                                 flow->dp = 0;
1273                                                 break;
1274
1275 tl_known:
1276 #if ((DEBUG) & DEBUG_C)
1277                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1278                                                 strcat(logbuf, buf);
1279 #endif
1280                                                 flow->flags |= FLOW_TL;
1281                                 }
1282                         }
1283
1284                         /* Check for tcp flags presence (including CWR and ECE). */
1285                         if (flow->proto == IPPROTO_TCP
1286                                         && off_frag < 16
1287                                         && psize >= 16 - off_frag) {
1288                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1289 #if ((DEBUG) & DEBUG_C)
1290                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1291                                 strcat(logbuf, buf);
1292 #endif
1293                         }
1294
1295 #if ((DEBUG) & DEBUG_C)
1296                         sprintf(buf, " => %x", (unsigned) flow);
1297                         strcat(logbuf, buf);
1298                         my_log(LOG_DEBUG, "%s", logbuf);
1299 #endif
1300
1301 #if ((DEBUG) & DEBUG_I)
1302                         pkts_pending++;
1303                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1304                         if (pending_queue_trace < pending_queue_trace_candidate)
1305                                 pending_queue_trace = pending_queue_trace_candidate;
1306 #endif
1307
1308                         /* Flow complete - inform unpending_thread() about it */
1309                         pending_head->flags |= FLOW_PENDING;
1310                         pending_head = pending_head->next;
1311 done:
1312                         pthread_cond_signal(&unpending_cond);
1313                 }
1314         }
1315         return 0;
1316 }
1317
1318 /* Copied out of CoDemux */
1319
1320 static int init_daemon() {
1321         pid_t pid;
1322         FILE *pidfile;
1323
1324         pidfile = fopen(PIDFILE, "w");
1325         if (pidfile == NULL) {
1326                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1327         }
1328
1329         if ((pid = fork()) < 0) {
1330                 fclose(pidfile);
1331                 my_log(LOG_ERR, "Could not fork!\n");
1332                 return(-1);
1333         }
1334         else if (pid != 0) {
1335                 /* i'm the parent, writing down the child pid  */
1336                 fprintf(pidfile, "%u\n", pid);
1337                 fclose(pidfile);
1338                 exit(0);
1339         }
1340
1341         /* close the pid file */
1342         fclose(pidfile);
1343
1344         /* routines for any daemon process
1345            1. create a new session 
1346            2. change directory to the root
1347            3. change the file creation permission 
1348            */
1349         setsid();
1350         chdir("/var/local/fprobe");
1351         umask(0);
1352
1353         return(0);
1354 }
1355
1356 int main(int argc, char **argv)
1357 {
1358         char errpbuf[512];
1359         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1360         int c, i, write_fd, memory_limit = 0;
1361         struct addrinfo hints, *res;
1362         struct sockaddr_in saddr;
1363         pthread_attr_t tattr;
1364         struct sigaction sigact;
1365         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1366         struct timeval timeout;
1367
1368         sched_min = sched_get_priority_min(SCHED);
1369         sched_max = sched_get_priority_max(SCHED);
1370
1371         memset(&saddr, 0 , sizeof(saddr));
1372         memset(&hints, 0 , sizeof(hints));
1373         hints.ai_flags = AI_PASSIVE;
1374         hints.ai_family = AF_INET;
1375         hints.ai_socktype = SOCK_DGRAM;
1376
1377         /* Process command line options */
1378
1379         opterr = 0;
1380         while ((c = my_getopt(argc, argv, parms)) != -1) {
1381                 switch (c) {
1382                         case '?':
1383                                 usage();
1384
1385                         case 'h':
1386                                 usage();
1387                 }
1388         }
1389
1390         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1391         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1392         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1393         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1394         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1395         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1396         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1397         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1398         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1399         if (parms[nflag].count) {
1400                 switch (atoi(parms[nflag].arg)) {
1401                         case 1:
1402                                 netflow = &NetFlow1;
1403                                 break;
1404
1405                         case 5:
1406                                 break;
1407
1408                         case 7:
1409                                 netflow = &NetFlow7;
1410                                 break;
1411
1412                         default:
1413                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1414                                 exit(1);
1415                 }
1416         }
1417         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1418         if (parms[lflag].count) {
1419                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1420                         *log_suffix++ = 0;
1421                         if (*log_suffix) {
1422                                 sprintf(errpbuf, "[%s]", log_suffix);
1423                                 strcat(ident, errpbuf);
1424                         }
1425                 }
1426                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1427                 if (log_suffix) *--log_suffix = ':';
1428         }
1429         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1430 err_malloc:
1431                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1432                 exit(1);
1433         }
1434         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1435         if (parms[qflag].count) {
1436                 pending_queue_length = atoi(parms[qflag].arg);
1437                 if (pending_queue_length < 1) {
1438                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1439                         exit(1);
1440                 }
1441         }
1442         if (parms[rflag].count) {
1443                 schedp.sched_priority = atoi(parms[rflag].arg);
1444                 if (schedp.sched_priority
1445                                 && (schedp.sched_priority < sched_min
1446                                         || schedp.sched_priority > sched_max)) {
1447                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1448                         exit(1);
1449                 }
1450         }
1451         if (parms[Bflag].count) {
1452                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1453         }
1454         if (parms[bflag].count) {
1455                 bulk_quantity = atoi(parms[bflag].arg);
1456                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1457                         fprintf(stderr, "Illegal %s\n", "bulk size");
1458                         exit(1);
1459                 }
1460         }
1461         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1462         if (parms[Xflag].count) {
1463                 for(i = 0; parms[Xflag].arg[i]; i++)
1464                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1465                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1466                         goto err_malloc;
1467                 rule = strtok(parms[Xflag].arg, ":");
1468                 for (i = 0; rule; i++) {
1469                         snmp_rules[i].len = strlen(rule);
1470                         if (snmp_rules[i].len > IFNAMSIZ) {
1471                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1472                                 exit(1);
1473                         }
1474                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1475                         if (!*(rule - 1)) *(rule - 1) = ',';
1476                         rule = strtok(NULL, ",");
1477                         if (!rule) {
1478                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1479                                 exit(1);
1480                         }
1481                         snmp_rules[i].base = atoi(rule);
1482                         *(rule - 1) = ':';
1483                         rule = strtok(NULL, ":");
1484                 }
1485                 nsnmp_rules = i;
1486         }
1487         if (parms[tflag].count)
1488                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1489         if (parms[aflag].count) {
1490                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1491 bad_lhost:
1492                         fprintf(stderr, "Illegal %s\n", "source address");
1493                         exit(1);
1494                 } else {
1495                         saddr = *((struct sockaddr_in *) res->ai_addr);
1496                         freeaddrinfo(res);
1497                 }
1498         }
1499         if (parms[uflag].count) 
1500                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1501                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1502                         exit(1);
1503                 }
1504
1505
1506         /* Process collectors parameters. Brrrr... :-[ */
1507
1508         npeers = argc - optind;
1509         if (npeers > 1) {
1510                 /* Send to remote Netflow collector */
1511                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1512                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1513                         dhost = argv[i];
1514                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1515                         *dport++ = 0;
1516                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1517                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1518                                 exit(1);
1519                         }
1520                         peers[npeers].write_fd = write_fd;
1521                         peers[npeers].type = PEER_MIRROR;
1522                         peers[npeers].laddr = saddr;
1523                         peers[npeers].seq = 0;
1524                         if ((lhost = strchr(dport, '/'))) {
1525                                 *lhost++ = 0;
1526                                 if ((type = strchr(lhost, '/'))) {
1527                                         *type++ = 0;
1528                                         switch (*type) {
1529                                                 case 0:
1530                                                 case 'm':
1531                                                         break;
1532
1533                                                 case 'r':
1534                                                         peers[npeers].type = PEER_ROTATE;
1535                                                         npeers_rot++;
1536                                                         break;
1537
1538                                                 default:
1539                                                         goto bad_collector;
1540                                         }
1541                                 }
1542                                 if (*lhost) {
1543                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1544                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1545                                         freeaddrinfo(res);
1546                                 }
1547                         }
1548                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1549                                                 sizeof(struct sockaddr_in))) {
1550                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1551                                 exit(1);
1552                         }
1553                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1554 bad_collector:
1555                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1556                                 exit(1);
1557                         }
1558                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1559                         freeaddrinfo(res);
1560                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1561                                                 sizeof(struct sockaddr_in))) {
1562                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1563                                 exit(1);
1564                         }
1565
1566                         /* Restore command line */
1567                         if (type) *--type = '/';
1568                         if (lhost) *--lhost = '/';
1569                         *--dport = ':';
1570                 }
1571         }
1572         else if (parms[fflag].count) {
1573                 // log into a file
1574                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1575                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1576                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1577
1578                 peers[npeers].write_fd = START_DATA_FD;
1579                 peers[npeers].type = PEER_FILE;
1580                 peers[npeers].seq = 0;
1581
1582                 read_cur_epoch();
1583                 npeers++;
1584         }
1585         else 
1586                 usage();
1587
1588
1589         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1590         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1591         if (!ulog_handle) {
1592                 fprintf(stderr, "libipulog initialization error: %s",
1593                                 ipulog_strerror(ipulog_errno));
1594                 exit(1);
1595         }
1596         if (sockbufsize)
1597                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1598                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1599                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1600
1601         /* Daemonize (if log destination stdout-free) */
1602
1603         my_log_open(ident, verbosity, log_dest);
1604
1605         init_daemon();
1606
1607         if (!(log_dest & 2)) {
1608                 /* Crash-proofing - Sapan*/
1609                 while (1) {
1610                         int pid=fork();
1611                         if (pid==-1) {
1612                                 fprintf(stderr, "fork(): %s", strerror(errno));
1613                                 exit(1);
1614                         }
1615                         else if (pid==0) {
1616                                 setsid();
1617                                 freopen("/dev/null", "r", stdin);
1618                                 freopen("/dev/null", "w", stdout);
1619                                 freopen("/dev/null", "w", stderr);
1620                                 break;
1621                         }
1622                         else {
1623                                 while (wait3(NULL,0,NULL) < 1);
1624                         }
1625                 }
1626         } else {
1627                 setvbuf(stdout, (char *)0, _IONBF, 0);
1628                 setvbuf(stderr, (char *)0, _IONBF, 0);
1629         }
1630
1631         pid = getpid();
1632         sprintf(errpbuf, "[%ld]", (long) pid);
1633         strcat(ident, errpbuf);
1634
1635         /* Initialization */
1636
1637     init_slice_id_hash();
1638         hash_init(); /* Actually for crc16 only */
1639         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1640         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1641
1642 #ifdef UPTIME_TRICK
1643         /* Hope 12 days is enough :-/ */
1644         start_time_offset = 1 << 20;
1645
1646         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1647 #endif
1648         gettime(&start_time);
1649
1650         /*
1651            Build static pending queue as circular buffer.
1652            */
1653         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1654         pending_tail = pending_head;
1655         for (i = pending_queue_length - 1; i--;) {
1656                 if (!(pending_tail->next = mem_alloc())) {
1657 err_mem_alloc:
1658                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1659                         exit(1);
1660                 }
1661                 pending_tail = pending_tail->next;
1662         }
1663         pending_tail->next = pending_head;
1664         pending_tail = pending_head;
1665
1666         sigemptyset(&sig_mask);
1667         sigact.sa_handler = &sighandler;
1668         sigact.sa_mask = sig_mask;
1669         sigact.sa_flags = 0;
1670         sigaddset(&sig_mask, SIGTERM);
1671         sigaction(SIGTERM, &sigact, 0);
1672 #if ((DEBUG) & DEBUG_I)
1673         sigaddset(&sig_mask, SIGUSR1);
1674         sigaction(SIGUSR1, &sigact, 0);
1675 #endif
1676         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1677                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1678                 exit(1);
1679         }
1680
1681         my_log(LOG_INFO, "Starting %s...", VERSION);
1682
1683         if (parms[cflag].count) {
1684                 if (chdir(parms[cflag].arg) || chroot(".")) {
1685                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1686                         exit(1);
1687                 }
1688         }
1689
1690         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1691         pthread_attr_init(&tattr);
1692         for (i = 0; i < THREADS - 1; i++) {
1693                 if (schedp.sched_priority > 0) {
1694                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1695                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1696                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1697                                 exit(1);
1698                         }
1699                 }
1700                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1701                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1702                         exit(1);
1703                 }
1704                 pthread_detach(thid);
1705                 schedp.sched_priority++;
1706         }
1707
1708         if (pw) {
1709                 if (setgroups(0, NULL)) {
1710                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1711                         exit(1);
1712                 }
1713                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1714                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1715                         exit(1);
1716                 }
1717                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1718                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1719                         exit(1);
1720                 }
1721         }
1722
1723         if (!(pidfile = fopen(pidfilepath, "w")))
1724                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1725         else {
1726                 fprintf(pidfile, "%ld\n", (long) pid);
1727                 fclose(pidfile);
1728         }
1729
1730         my_log(LOG_INFO, "pid: %d", pid);
1731         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1732                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1733                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1734                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1735                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1736                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1737                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1738         for (i = 0; i < nsnmp_rules; i++) {
1739                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1740                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1741         }
1742         for (i = 0; i < npeers; i++) {
1743                 switch (peers[i].type) {
1744                         case PEER_MIRROR:
1745                                 c = 'm';
1746                                 break;
1747                         case PEER_ROTATE:
1748                                 c = 'r';
1749                                 break;
1750                 }
1751                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1752                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1753                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1754         }
1755
1756         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1757
1758         timeout.tv_usec = 0;
1759         while (!killed
1760                         || (total_elements - free_elements - pending_queue_length)
1761                         || emit_count
1762                         || pending_tail->flags) {
1763
1764                 if (!sigs) {
1765                         timeout.tv_sec = scan_interval;
1766                         select(0, 0, 0, 0, &timeout);
1767                 }
1768
1769                 if (sigs & SIGTERM_MASK && !killed) {
1770                         sigs &= ~SIGTERM_MASK;
1771                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1772                         scan_interval = 1;
1773                         frag_lifetime = -1;
1774                         active_lifetime = -1;
1775                         inactive_lifetime = -1;
1776                         emit_timeout = 1;
1777                         unpending_timeout = 1;
1778                         killed = 1;
1779                         pthread_cond_signal(&scan_cond);
1780                         pthread_cond_signal(&unpending_cond);
1781                 }
1782
1783 #if ((DEBUG) & DEBUG_I)
1784                 if (sigs & SIGUSR1_MASK) {
1785                         sigs &= ~SIGUSR1_MASK;
1786                         info_debug();
1787                 }
1788 #endif
1789         }
1790         remove(pidfilepath);
1791 #if ((DEBUG) & DEBUG_I)
1792         info_debug();
1793 #endif
1794         my_log(LOG_INFO, "Done.");
1795 #ifdef WALL
1796         return 0;
1797 #endif
1798 }