Bug fix.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 #include "vserver.h"
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
223
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
231
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
235
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
239 static pid_t pid;
240 static int killed;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
248
249 void usage()
250 {
251         fprintf(stdout,
252                         "fprobe-ulog: a NetFlow probe. Version %s\n"
253                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
254                         "\n"
255                         "-h\t\tDisplay this help\n"
256                         "-U <mask>\tULOG group bitwise mask [1]\n"
257                         "-s <seconds>\tHow often scan for expired flows [5]\n"
258                         "-g <seconds>\tFragmented flow lifetime [30]\n"
259                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260                         "-f <filename>\tLog flow data in a file\n"
261                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263                         "-a <address>\tUse <address> as source for NetFlow flow\n"
264                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265                         "-M\t\tUse netfilter mark value as ToS flag\n"
266                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268                         "-q <flows>\tPending queue length [100]\n"
269                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
270                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272                         "-c <directory>\tDirectory to chroot to\n"
273                         "-u <user>\tUser to run as\n"
274                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276                         "-y <remote:port>\tAddress of the NetFlow collector\n"
277                         "-f <writable file>\tFile to write data into\n"
278                         "-T <n>\tRotate log file every n epochs\n"
279                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280                         "-E <[1..60]>\tSize of an epoch in minutes\n"
281                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
282                         ,
283                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
284         exit(0);
285 }
286
287 #if ((DEBUG) & DEBUG_I)
288 void info_debug()
289 {
290         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291                         pkts_total, pkts_total_fragmented, size_total,
292                         pkts_pending - pkts_pending_done, pending_queue_trace);
293         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298                         total_elements, free_elements, total_memory);
299 }
300 #endif
301
302 void sighandler(int sig)
303 {
304         switch (sig) {
305                 case SIGTERM:
306                         sigs |= SIGTERM_MASK;
307                         break;
308 #if ((DEBUG) & DEBUG_I)
309                 case SIGUSR1:
310                         sigs |= SIGUSR1_MASK;
311                         break;
312 #endif
313         }
314 }
315
316 void gettime(struct Time *now)
317 {
318         struct timeval t;
319
320         gettimeofday(&t, 0);
321         now->sec = t.tv_sec;
322         now->usec = t.tv_usec;
323 }
324
325
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
327 {
328         return (t1->sec - t2->sec)/60;
329 }
330
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
332 {
333         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
334 }
335
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
338 {
339         /* Maximum uptime is about 49/2 days */
340         return cmpmtime(t, &start_time);
341 }
342
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
345 {
346         /* Maximum uptime is about 49/2 days */
347         return cmpMtime(t, &start_time);
348 }
349
350 hash_t hash_flow(struct Flow *flow)
351 {
352         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353         else return hash(flow, sizeof(struct Flow_TL));
354 }
355
356 uint16_t snmp_index(char *name) {
357         uint32_t i;
358
359         if (!*name) return 0;
360
361         for (i = 0; (int) i < nsnmp_rules; i++) {
362                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
364         }
365
366         if ((i = if_nametoindex(name))) return i;
367
368         return -1;
369 }
370
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
372 {
373         dst->iif = src->iif;
374         dst->oif = src->oif;
375         dst->sip = src->sip;
376         dst->dip = src->dip;
377         dst->tos = src->tos;
378         dst->xid = src->xid;
379         dst->proto = src->proto;
380         dst->tcp_flags = src->tcp_flags;
381         dst->id = src->id;
382         dst->sp = src->sp;
383         dst->dp = src->dp;
384         dst->pkts = src->pkts;
385         dst->size = src->size;
386         dst->sizeF = src->sizeF;
387         dst->sizeP = src->sizeP;
388         dst->ctime = src->ctime;
389         dst->mtime = src->mtime;
390         dst->flags = src->flags;
391 }
392
393 void read_cur_epoch() {
394         int fd;
395         /* Reset to -1 in case the read fails */
396         cur_epoch=-1;
397         fd = open(LAST_EPOCH_FILE, O_RDONLY);
398         if (fd != -1) {
399                 char snum[MAX_EPOCH_SIZE];
400                 ssize_t len;
401                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
402                 if (len != -1) {
403                         snum[len]='\0';
404                         sscanf(snum,"%d",&cur_epoch);
405                         cur_epoch++; /* Let's not stone the last epoch */
406                         close(fd);
407                 }
408         }
409         return;
410 }
411
412
413 /* Dumps the current epoch in a file to cope with
414  * reboots and killings of fprobe */
415
416 void update_cur_epoch_file(int n) {
417         int fd, len;
418         char snum[MAX_EPOCH_SIZE];
419         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420         fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
421         if (fd == -1) {
422                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
423                 return;
424         }
425         write(fd, snum, len);
426         close(fd);
427 }
428
429 /* Get the file descriptor corresponding to the current file.
430  * The kludgy implementation is to abstract away the 'current
431  * file descriptor', which may also be a socket. 
432  */
433
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
435         struct Time now;
436         unsigned cur_uptime;
437
438         struct statfs statfs;
439         int ret_fd;
440
441         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442          * doesn't solve the problem */
443         gettime(&now);
444         cur_uptime = getuptime_minutes(&now);
445
446         if (cur_fd != START_DATA_FD) {
447                 if (fstatfs(cur_fd, &statfs) == -1) {
448                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
449                 }
450                 else {
451                         if (min_free && (statfs.f_bavail < min_free)
452                                         && (cur_epoch==last_peak))
453                         {
454                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
455                                 cur_epoch = -1;
456                         }
457                         /*
458                            else 
459                            assume that we can reclaim space by overwriting our own files
460                            and that the difference in size will not fill the disk - sapan
461                            */
462                 }
463         }
464
465         /* If epoch length has been exceeded, 
466          * or we're starting up 
467          * or we're going back to the first epoch */
468         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469                 char nextname[MAX_PATH_LEN];
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0)
475                         close(cur_fd);
476                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477                 if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
478                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
479                         exit(1);
480                 }
481                 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
482                         my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno));
483                 }
484                 update_cur_epoch_file(cur_epoch);
485                 ret_fd = write_fd;
486         }
487         else {
488                 ret_fd = cur_fd;
489         }
490         return(ret_fd);
491 }
492
493 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
494 {
495         struct Flow **flowpp;
496
497 #ifdef WALL
498         flowpp = 0;
499 #endif
500
501         if (prev) flowpp = *prev;
502
503         while (where) {
504                 if (where->sip.s_addr == what->sip.s_addr
505                                 && where->dip.s_addr == what->dip.s_addr
506                                 && where->proto == what->proto) {
507                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
508                                 case 0:
509                                         /* Both unfragmented */
510                                         if ((what->sp == where->sp)
511                                                         && (what->dp == where->dp)) goto done;
512                                         break;
513                                 case 2:
514                                         /* Both fragmented */
515                                         if (where->id == what->id) goto done;
516                                         break;
517                         }
518                 }
519                 flowpp = &where->next;
520                 where = where->next;
521         }
522 done:
523         if (prev) *prev = flowpp;
524         return where;
525 }
526
527         int put_into(struct Flow *flow, int flag
528 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
529                         , char *logbuf
530 #endif
531                     )
532 {
533         int ret = 0;
534         hash_t h;
535         struct Flow *flown, **flowpp;
536 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
537         char buf[64];
538 #endif
539
540         h = hash_flow(flow);
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
543         strcat(logbuf, buf);
544 #endif
545         pthread_mutex_lock(&flows_mutex[h]);
546         flowpp = &flows[h];
547         if (!(flown = find(flows[h], flow, &flowpp))) {
548                 /* No suitable flow found - add */
549                 if (flag == COPY_INTO) {
550                         if ((flown = mem_alloc())) {
551                                 copy_flow(flow, flown);
552                                 flow = flown;
553                         } else {
554 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
555                                 my_log(LOG_ERR, "%s %s. %s",
556                                                 "mem_alloc():", strerror(errno), "packet lost");
557 #endif
558                                 return -1;
559                         }
560                 }
561                 flow->next = flows[h];
562                 flows[h] = flow;
563 #if ((DEBUG) & DEBUG_I)
564                 flows_total++;
565                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
566 #endif
567 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
568                 if (flown) {
569                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
570                         strcat(logbuf, buf);
571                 }
572 #endif
573         } else {
574                 /* Found suitable flow - update */
575 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
576                 sprintf(buf, " +> %x", (unsigned) flown);
577                 strcat(logbuf, buf);
578 #endif
579                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
580                         flown->mtime = flow->mtime;
581                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
582                         flown->ctime = flow->ctime;
583                 flown->tcp_flags |= flow->tcp_flags;
584                 flown->size += flow->size;
585                 flown->pkts += flow->pkts;
586                 if (flow->flags & FLOW_FRAG) {
587                         /* Fragmented flow require some additional work */
588                         if (flow->flags & FLOW_TL) {
589                                 /*
590                                    ?FIXME?
591                                    Several packets with FLOW_TL (attack)
592                                    */
593                                 flown->sp = flow->sp;
594                                 flown->dp = flow->dp;
595                         }
596                         if (flow->flags & FLOW_LASTFRAG) {
597                                 /*
598                                    ?FIXME?
599                                    Several packets with FLOW_LASTFRAG (attack)
600                                    */
601                                 flown->sizeP = flow->sizeP;
602                         }
603                         flown->flags |= flow->flags;
604                         flown->sizeF += flow->sizeF;
605                         if ((flown->flags & FLOW_LASTFRAG)
606                                         && (flown->sizeF >= flown->sizeP)) {
607                                 /* All fragments received - flow reassembled */
608                                 *flowpp = flown->next;
609                                 pthread_mutex_unlock(&flows_mutex[h]);
610 #if ((DEBUG) & DEBUG_I)
611                                 flows_total--;
612                                 flows_fragmented--;
613 #endif
614                                 flown->id = 0;
615                                 flown->flags &= ~FLOW_FRAG;
616 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
617                                 strcat(logbuf," R");
618 #endif
619                                 ret = put_into(flown, MOVE_INTO
620 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
621                                                 , logbuf
622 #endif
623                                               );
624                         }
625                 }
626                 if (flag == MOVE_INTO) mem_free(flow);
627         }
628         pthread_mutex_unlock(&flows_mutex[h]);
629         return ret;
630 }
631
632 int onlyonce=0;
633
634 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
635 {
636         int i;
637
638         for (i = 0; i < fields; i++) {
639 #if ((DEBUG) & DEBUG_F)
640                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
641 #endif
642                 switch (format[i]) {
643                         case NETFLOW_IPV4_SRC_ADDR:
644                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
645                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
646                                 break;
647
648                         case NETFLOW_IPV4_DST_ADDR:
649                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
650                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
651                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
652                                 }
653                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
654                                 break;
655
656                         case NETFLOW_INPUT_SNMP:
657                                 *((uint16_t *) p) = htons(flow->iif);
658                                 p += NETFLOW_INPUT_SNMP_SIZE;
659                                 break;
660
661                         case NETFLOW_OUTPUT_SNMP:
662                                 *((uint16_t *) p) = htons(flow->oif);
663                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
664                                 break;
665
666                         case NETFLOW_PKTS_32:
667                                 *((uint32_t *) p) = htonl(flow->pkts);
668                                 p += NETFLOW_PKTS_32_SIZE;
669                                 break;
670
671                         case NETFLOW_BYTES_32:
672                                 *((uint32_t *) p) = htonl(flow->size);
673                                 p += NETFLOW_BYTES_32_SIZE;
674                                 break;
675
676                         case NETFLOW_FIRST_SWITCHED:
677                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
678                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
679                                 break;
680
681                         case NETFLOW_LAST_SWITCHED:
682                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
683                                 p += NETFLOW_LAST_SWITCHED_SIZE;
684                                 break;
685
686                         case NETFLOW_L4_SRC_PORT:
687                                 *((uint16_t *) p) = flow->sp;
688                                 p += NETFLOW_L4_SRC_PORT_SIZE;
689                                 break;
690
691                         case NETFLOW_L4_DST_PORT:
692                                 *((uint16_t *) p) = flow->dp;
693                                 p += NETFLOW_L4_DST_PORT_SIZE;
694                                 break;
695
696                         case NETFLOW_PROT:
697                                 *((uint8_t *) p) = flow->proto;
698                                 p += NETFLOW_PROT_SIZE;
699                                 break;
700
701                         case NETFLOW_SRC_TOS:
702                                 *((uint8_t *) p) = flow->tos;
703                                 p += NETFLOW_SRC_TOS_SIZE;
704                                 break;
705
706                         case NETFLOW_TCP_FLAGS:
707                                 *((uint8_t *) p) = flow->tcp_flags;
708                                 p += NETFLOW_TCP_FLAGS_SIZE;
709                                 break;
710
711                         case NETFLOW_VERSION:
712                                 *((uint16_t *) p) = htons(netflow->Version);
713                                 p += NETFLOW_VERSION_SIZE;
714                                 break;
715
716                         case NETFLOW_COUNT:
717                                 *((uint16_t *) p) = htons(emit_count);
718                                 p += NETFLOW_COUNT_SIZE;
719                                 break;
720
721                         case NETFLOW_UPTIME:
722                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
723                                 p += NETFLOW_UPTIME_SIZE;
724                                 break;
725
726                         case NETFLOW_UNIX_SECS:
727                                 *((uint32_t *) p) = htonl(emit_time.sec);
728                                 p += NETFLOW_UNIX_SECS_SIZE;
729                                 break;
730
731                         case NETFLOW_UNIX_NSECS:
732                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
733                                 p += NETFLOW_UNIX_NSECS_SIZE;
734                                 break;
735
736                         case NETFLOW_FLOW_SEQUENCE:
737                                 //*((uint32_t *) p) = htonl(emit_sequence);
738                                 *((uint32_t *) p) = 0;
739                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
740                                 break;
741
742                         case NETFLOW_PAD8:
743                                 /* Unsupported (uint8_t) */
744                         case NETFLOW_ENGINE_TYPE:
745                         case NETFLOW_ENGINE_ID:
746                         case NETFLOW_FLAGS7_1:
747                         case NETFLOW_SRC_MASK:
748                         case NETFLOW_DST_MASK:
749                                 if (onlyonce) {
750                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
751                                         onlyonce=1;
752                                 }
753                                 *((uint8_t *) p) = 0;
754                                 p += NETFLOW_PAD8_SIZE;
755                                 break;
756                         case NETFLOW_XID:
757                                 *((uint32_t *) p) = flow->xid;
758                                 p += NETFLOW_XID_SIZE;
759                                 break;
760                         case NETFLOW_PAD16:
761                                 /* Unsupported (uint16_t) */
762                         case NETFLOW_SRC_AS:
763                         case NETFLOW_DST_AS:
764                         case NETFLOW_FLAGS7_2:
765                                 *((uint16_t *) p) = 0;
766                                 p += NETFLOW_PAD16_SIZE;
767                                 break;
768
769                         case NETFLOW_PAD32:
770                                 /* Unsupported (uint32_t) */
771                         case NETFLOW_IPV4_NEXT_HOP:
772                         case NETFLOW_ROUTER_SC:
773                                 *((uint32_t *) p) = 0;
774                                 p += NETFLOW_PAD32_SIZE;
775                                 break;
776
777                         default:
778                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
779                                                 format, i, format[i]);
780                                 exit(1);
781                 }
782         }
783 #if ((DEBUG) & DEBUG_F)
784         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
785 #endif
786         return p;
787 }
788
789 void setuser() {
790         /*
791            Workaround for clone()-based threads
792            Try to change EUID independently of main thread
793            */
794         if (pw) {
795                 setgroups(0, NULL);
796                 setregid(pw->pw_gid, pw->pw_gid);
797                 setreuid(pw->pw_uid, pw->pw_uid);
798         }
799 }
800
801 void *emit_thread()
802 {
803         struct Flow *flow;
804         void *p;
805         struct timeval now;
806         struct timespec timeout;
807         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
808
809         p = (void *) &emit_packet + netflow->HeaderSize;
810         timeout.tv_nsec = 0;
811
812         setuser();
813
814         for (;;) {
815                 pthread_mutex_lock(&emit_mutex);
816                 while (!flows_emit) {
817                         gettimeofday(&now, 0);
818                         timeout.tv_sec = now.tv_sec + emit_timeout;
819                         /* Do not wait until emit_packet will filled - it may be too long */
820                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
821                                 pthread_mutex_unlock(&emit_mutex);
822                                 goto sendit;
823                         }
824                 }
825                 flow = flows_emit;
826                 flows_emit = flows_emit->next;
827 #if ((DEBUG) & DEBUG_I)
828                 emit_queue--;
829 #endif          
830                 pthread_mutex_unlock(&emit_mutex);
831
832 #ifdef UPTIME_TRICK
833                 if (!emit_count) {
834                         gettime(&start_time);
835                         start_time.sec -= start_time_offset;
836                 }
837 #endif
838                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
839                 mem_free(flow);
840                 emit_count++;
841 #ifdef PF2_DEBUG
842                 printf("Emit count = %d\n", emit_count);
843                 fflush(stdout);
844 #endif
845                 if (emit_count == netflow->MaxFlows) {
846 sendit:
847                         gettime(&emit_time);
848                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
849                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
850                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
851 #ifdef STD_NETFLOW_PDU
852                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
853 #endif
854                         peer_rot_cur = 0;
855                         for (i = 0; i < npeers; i++) {
856                                 if (peers[i].type == PEER_FILE) {
857                                         if (netflow->SeqOffset)
858                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
859                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
860                                         ret = write(peers[i].write_fd, emit_packet, size);
861                                         if (ret < size) {
862
863 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
864                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
865                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
866 #endif
867 #undef MESSAGES
868                                         }
869 #if ((DEBUG) & DEBUG_E)
870                                         commaneelse {
871                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
872                                                                 emit_count, i + 1, peers[i].seq);
873                                         }
874 #endif
875                                         peers[i].seq += emit_count;
876
877                                         /* Rate limit */
878                                         if (emit_rate_bytes) {
879                                                 sent += size;
880                                                 delay = sent / emit_rate_bytes;
881                                                 if (delay) {
882                                                         sent %= emit_rate_bytes;
883                                                         timeout.tv_sec = 0;
884                                                         timeout.tv_nsec = emit_rate_delay * delay;
885                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
886                                                 }
887                                         }
888                                 }
889                                 else
890                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
891                                         else
892                                                 if (peers[i].type == PEER_ROTATE) 
893                                                         if (peer_rot_cur++ == peer_rot_work) {
894 sendreal:
895                                                                 if (netflow->SeqOffset)
896                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
897                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
898                                                                 if (ret < size) {
899 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
900                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
901                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
902 #endif
903                                                                 }
904 #if ((DEBUG) & DEBUG_E)
905                                                                 commaneelse {
906                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
907                                                                                         emit_count, i + 1, peers[i].seq);
908                                                                 }
909 #endif
910                                                                 peers[i].seq += emit_count;
911
912                                                                 /* Rate limit */
913                                                                 if (emit_rate_bytes) {
914                                                                         sent += size;
915                                                                         delay = sent / emit_rate_bytes;
916                                                                         if (delay) {
917                                                                                 sent %= emit_rate_bytes;
918                                                                                 timeout.tv_sec = 0;
919                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
920                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
921                                                                         }
922                                                                 }
923                                                         }
924                         }
925                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
926                         emit_sequence += emit_count;
927                         emit_count = 0;
928 #if ((DEBUG) & DEBUG_I)
929                         emit_pkts++;
930 #endif
931                 }
932         }
933 }       
934
935 void *unpending_thread()
936 {
937         struct timeval now;
938         struct timespec timeout;
939 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
940         char logbuf[256];
941 #endif
942
943         setuser();
944
945         timeout.tv_nsec = 0;
946         pthread_mutex_lock(&unpending_mutex);
947
948         for (;;) {
949                 while (!(pending_tail->flags & FLOW_PENDING)) {
950                         gettimeofday(&now, 0);
951                         timeout.tv_sec = now.tv_sec + unpending_timeout;
952                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
953                 }
954
955 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
956                 *logbuf = 0;
957 #endif
958                 if (put_into(pending_tail, COPY_INTO
959 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
960                                         , logbuf
961 #endif
962                             ) < 0) {
963 #if ((DEBUG) & DEBUG_I)
964                         pkts_lost_unpending++;
965 #endif                          
966                 }
967
968 #if ((DEBUG) & DEBUG_U)
969                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
970 #endif
971
972                 pending_tail->flags = 0;
973                 pending_tail = pending_tail->next;
974 #if ((DEBUG) & DEBUG_I)
975                 pkts_pending_done++;
976 #endif
977         }
978 }
979
980 void *scan_thread()
981 {
982 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
983         char logbuf[256];
984 #endif
985         int i;
986         struct Flow *flow, **flowpp;
987         struct Time now;
988         struct timespec timeout;
989
990         setuser();
991
992         timeout.tv_nsec = 0;
993         pthread_mutex_lock(&scan_mutex);
994
995         for (;;) {
996                 gettime(&now);
997                 timeout.tv_sec = now.sec + scan_interval;
998                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
999
1000                 gettime(&now);
1001 #if ((DEBUG) & DEBUG_S)
1002                 my_log(LOG_DEBUG, "S: %d", now.sec);
1003 #endif
1004                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1005                         pthread_mutex_lock(&flows_mutex[i]);
1006                         flow = flows[i];
1007                         flowpp = &flows[i];
1008                         while (flow) {
1009                                 if (flow->flags & FLOW_FRAG) {
1010                                         /* Process fragmented flow */
1011                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1012                                                 /* Fragmented flow expired - put it into special chain */
1013 #if ((DEBUG) & DEBUG_I)
1014                                                 flows_fragmented--;
1015                                                 flows_total--;
1016 #endif
1017                                                 *flowpp = flow->next;
1018                                                 flow->id = 0;
1019                                                 flow->flags &= ~FLOW_FRAG;
1020                                                 flow->next = scan_frag_dreg;
1021                                                 scan_frag_dreg = flow;
1022                                                 flow = *flowpp;
1023                                                 continue;
1024                                         }
1025                                 } else {
1026                                         /* Flow is not frgamented */
1027                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1028                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1029                                                 /* Flow expired */
1030 #if ((DEBUG) & DEBUG_S)
1031                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1032 #endif
1033 #if ((DEBUG) & DEBUG_I)
1034                                                 flows_total--;
1035 #endif
1036                                                 *flowpp = flow->next;
1037                                                 pthread_mutex_lock(&emit_mutex);
1038                                                 flow->next = flows_emit;
1039                                                 flows_emit = flow;
1040 #if ((DEBUG) & DEBUG_I)
1041                                                 emit_queue++;
1042 #endif                          
1043                                                 pthread_mutex_unlock(&emit_mutex);
1044                                                 flow = *flowpp;
1045                                                 continue;
1046                                         }
1047                                 }
1048                                 flowpp = &flow->next;
1049                                 flow = flow->next;
1050                         } /* chain loop */
1051                         pthread_mutex_unlock(&flows_mutex[i]);
1052                 } /* hash loop */
1053                 if (flows_emit) pthread_cond_signal(&emit_cond);
1054
1055                 while (scan_frag_dreg) {
1056                         flow = scan_frag_dreg;
1057                         scan_frag_dreg = flow->next;
1058 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1059                         *logbuf = 0;
1060 #endif
1061                         put_into(flow, MOVE_INTO
1062 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1063                                         , logbuf
1064 #endif
1065                                 );
1066 #if ((DEBUG) & DEBUG_S)
1067                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1068 #endif
1069                 }
1070         }
1071 }
1072
1073 void *cap_thread()
1074 {
1075         struct ulog_packet_msg *ulog_msg;
1076         struct ip *nl;
1077         void *tl;
1078         struct Flow *flow;
1079         int len, off_frag, psize;
1080 #if ((DEBUG) & DEBUG_C)
1081         char buf[64];
1082         char logbuf[256];
1083 #endif
1084         int challenge;
1085
1086         setuser();
1087
1088         while (!killed) {
1089                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1090                 if (len <= 0) {
1091                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1092                         continue;
1093                 }
1094                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1095
1096 #if ((DEBUG) & DEBUG_C)
1097                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1098 #endif
1099
1100                         nl = (void *) &ulog_msg->payload;
1101                         psize = ulog_msg->data_len;
1102
1103                         /* Sanity check */
1104                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1105 #if ((DEBUG) & DEBUG_C)
1106                                 strcat(logbuf, " U");
1107                                 my_log(LOG_DEBUG, "%s", logbuf);
1108 #endif
1109 #if ((DEBUG) & DEBUG_I)
1110                                 pkts_ignored++;
1111 #endif
1112                                 continue;
1113                         }
1114
1115                         if (pending_head->flags) {
1116 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1117                                 my_log(LOG_ERR,
1118 # if ((DEBUG) & DEBUG_C)
1119                                                 "%s %s %s", logbuf,
1120 # else
1121                                                 "%s %s",
1122 # endif
1123                                                 "pending queue full:", "packet lost");
1124 #endif
1125 #if ((DEBUG) & DEBUG_I)
1126                                 pkts_lost_capture++;
1127 #endif
1128                                 goto done;
1129                         }
1130
1131 #if ((DEBUG) & DEBUG_I)
1132                         pkts_total++;
1133 #endif
1134
1135                         flow = pending_head;
1136
1137                         /* ?FIXME? Add sanity check for ip_len? */
1138                         flow->size = ntohs(nl->ip_len);
1139 #if ((DEBUG) & DEBUG_I)
1140                         size_total += flow->size;
1141 #endif
1142
1143                         flow->sip = nl->ip_src;
1144                         flow->dip = nl->ip_dst;
1145                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1146                         
1147                         /* It's going to be expensive calling this syscall on every flow.
1148                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1149
1150                         flow->xid=0;
1151
1152                         if (ulog_msg->mark > 0) {
1153                                 flow->xid = get_vhi_name(ulog_msg->mark);
1154                                 challenge = get_vhi_name(ulog_msg->mark);
1155                         }
1156
1157                         if (flow->xid < 1 || flow->xid!=challenge) 
1158                                 flow->xid = ulog_msg->mark;
1159
1160
1161                         if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1162                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid);
1163                         }
1164                         flow->iif = snmp_index(ulog_msg->indev_name);
1165                         flow->oif = snmp_index(ulog_msg->outdev_name);
1166                         flow->proto = nl->ip_p;
1167                         flow->id = 0;
1168                         flow->tcp_flags = 0;
1169                         flow->pkts = 1;
1170                         flow->sizeF = 0;
1171                         flow->sizeP = 0;
1172                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1173                         if (ulog_msg->timestamp_sec) {
1174                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1175                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1176                         } else gettime(&flow->ctime);
1177                         flow->mtime = flow->ctime;
1178
1179                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1180
1181                         /*
1182                            Offset (from network layer) to transport layer header/IP data
1183                            IOW IP header size ;-)
1184
1185                            ?FIXME?
1186                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1187                            */
1188                         off_tl = nl->ip_hl << 2;
1189                         tl = (void *) nl + off_tl;
1190
1191                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1192                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1193                         psize -= off_tl;
1194                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1195                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1196
1197                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1198                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1199 #if ((DEBUG) & DEBUG_C)
1200                                 strcat(logbuf, " F");
1201 #endif
1202 #if ((DEBUG) & DEBUG_I)
1203                                 pkts_total_fragmented++;
1204 #endif
1205                                 flow->flags |= FLOW_FRAG;
1206                                 flow->id = nl->ip_id;
1207
1208                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1209                                         /* Packet whith IP_MF contains information about whole datagram size */
1210                                         flow->flags |= FLOW_LASTFRAG;
1211                                         /* size = frag_offset*8 + data_size */
1212                                         flow->sizeP = off_frag + flow->sizeF;
1213                                 }
1214                         }
1215
1216 #if ((DEBUG) & DEBUG_C)
1217                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1218                         strcat(logbuf, buf);
1219                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1220                         strcat(logbuf, buf);
1221 #endif
1222
1223                         /*
1224                            Fortunately most interesting transport layer information fit
1225                            into first 8 bytes of IP data field (minimal nonzero size).
1226                            Thus we don't need actual packet reassembling to build whole
1227                            transport layer data. We only check the fragment offset for
1228                            zero value to find packet with this information.
1229                            */
1230                         if (!off_frag && psize >= 8) {
1231                                 switch (flow->proto) {
1232                                         case IPPROTO_TCP:
1233                                         case IPPROTO_UDP:
1234                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1235                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1236                                                 goto tl_known;
1237
1238 #ifdef ICMP_TRICK
1239                                         case IPPROTO_ICMP:
1240                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1241                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1242                                                 goto tl_known;
1243 #endif
1244 #ifdef ICMP_TRICK_CISCO
1245                                         case IPPROTO_ICMP:
1246                                                 flow->dp = *((int32_t *) tl);
1247                                                 goto tl_known;
1248 #endif
1249
1250                                         default:
1251                                                 /* Unknown transport layer */
1252 #if ((DEBUG) & DEBUG_C)
1253                                                 strcat(logbuf, " U");
1254 #endif
1255                                                 flow->sp = 0;
1256                                                 flow->dp = 0;
1257                                                 break;
1258
1259 tl_known:
1260 #if ((DEBUG) & DEBUG_C)
1261                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1262                                                 strcat(logbuf, buf);
1263 #endif
1264                                                 flow->flags |= FLOW_TL;
1265                                 }
1266                         }
1267
1268                         /* Check for tcp flags presence (including CWR and ECE). */
1269                         if (flow->proto == IPPROTO_TCP
1270                                         && off_frag < 16
1271                                         && psize >= 16 - off_frag) {
1272                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1273 #if ((DEBUG) & DEBUG_C)
1274                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1275                                 strcat(logbuf, buf);
1276 #endif
1277                         }
1278
1279 #if ((DEBUG) & DEBUG_C)
1280                         sprintf(buf, " => %x", (unsigned) flow);
1281                         strcat(logbuf, buf);
1282                         my_log(LOG_DEBUG, "%s", logbuf);
1283 #endif
1284
1285 #if ((DEBUG) & DEBUG_I)
1286                         pkts_pending++;
1287                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1288                         if (pending_queue_trace < pending_queue_trace_candidate)
1289                                 pending_queue_trace = pending_queue_trace_candidate;
1290 #endif
1291
1292                         /* Flow complete - inform unpending_thread() about it */
1293                         pending_head->flags |= FLOW_PENDING;
1294                         pending_head = pending_head->next;
1295 done:
1296                         pthread_cond_signal(&unpending_cond);
1297                 }
1298         }
1299         return 0;
1300 }
1301
1302 /* Copied out of CoDemux */
1303
1304 static int init_daemon() {
1305         pid_t pid;
1306         FILE *pidfile;
1307
1308         pidfile = fopen(PIDFILE, "w");
1309         if (pidfile == NULL) {
1310                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1311         }
1312
1313         if ((pid = fork()) < 0) {
1314                 fclose(pidfile);
1315                 my_log(LOG_ERR, "Could not fork!\n");
1316                 return(-1);
1317         }
1318         else if (pid != 0) {
1319                 /* i'm the parent, writing down the child pid  */
1320                 fprintf(pidfile, "%u\n", pid);
1321                 fclose(pidfile);
1322                 exit(0);
1323         }
1324
1325         /* close the pid file */
1326         fclose(pidfile);
1327
1328         /* routines for any daemon process
1329            1. create a new session 
1330            2. change directory to the root
1331            3. change the file creation permission 
1332            */
1333         setsid();
1334         chdir("/var/local/fprobe");
1335         umask(0);
1336
1337         return(0);
1338 }
1339
1340 int main(int argc, char **argv)
1341 {
1342         char errpbuf[512];
1343         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1344         int c, i, write_fd, memory_limit = 0;
1345         struct addrinfo hints, *res;
1346         struct sockaddr_in saddr;
1347         pthread_attr_t tattr;
1348         struct sigaction sigact;
1349         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1350         struct timeval timeout;
1351
1352         sched_min = sched_get_priority_min(SCHED);
1353         sched_max = sched_get_priority_max(SCHED);
1354
1355         memset(&saddr, 0 , sizeof(saddr));
1356         memset(&hints, 0 , sizeof(hints));
1357         hints.ai_flags = AI_PASSIVE;
1358         hints.ai_family = AF_INET;
1359         hints.ai_socktype = SOCK_DGRAM;
1360
1361         /* Process command line options */
1362
1363         opterr = 0;
1364         while ((c = my_getopt(argc, argv, parms)) != -1) {
1365                 switch (c) {
1366                         case '?':
1367                                 usage();
1368
1369                         case 'h':
1370                                 usage();
1371                 }
1372         }
1373
1374         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1375         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1376         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1377         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1378         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1379         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1380         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1381         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1382         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1383         if (parms[nflag].count) {
1384                 switch (atoi(parms[nflag].arg)) {
1385                         case 1:
1386                                 netflow = &NetFlow1;
1387                                 break;
1388
1389                         case 5:
1390                                 break;
1391
1392                         case 7:
1393                                 netflow = &NetFlow7;
1394                                 break;
1395
1396                         default:
1397                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1398                                 exit(1);
1399                 }
1400         }
1401         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1402         if (parms[lflag].count) {
1403                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1404                         *log_suffix++ = 0;
1405                         if (*log_suffix) {
1406                                 sprintf(errpbuf, "[%s]", log_suffix);
1407                                 strcat(ident, errpbuf);
1408                         }
1409                 }
1410                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1411                 if (log_suffix) *--log_suffix = ':';
1412         }
1413         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1414 err_malloc:
1415                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1416                 exit(1);
1417         }
1418         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1419         if (parms[qflag].count) {
1420                 pending_queue_length = atoi(parms[qflag].arg);
1421                 if (pending_queue_length < 1) {
1422                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1423                         exit(1);
1424                 }
1425         }
1426         if (parms[rflag].count) {
1427                 schedp.sched_priority = atoi(parms[rflag].arg);
1428                 if (schedp.sched_priority
1429                                 && (schedp.sched_priority < sched_min
1430                                         || schedp.sched_priority > sched_max)) {
1431                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1432                         exit(1);
1433                 }
1434         }
1435         if (parms[Bflag].count) {
1436                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1437         }
1438         if (parms[bflag].count) {
1439                 bulk_quantity = atoi(parms[bflag].arg);
1440                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1441                         fprintf(stderr, "Illegal %s\n", "bulk size");
1442                         exit(1);
1443                 }
1444         }
1445         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1446         if (parms[Xflag].count) {
1447                 for(i = 0; parms[Xflag].arg[i]; i++)
1448                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1449                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1450                         goto err_malloc;
1451                 rule = strtok(parms[Xflag].arg, ":");
1452                 for (i = 0; rule; i++) {
1453                         snmp_rules[i].len = strlen(rule);
1454                         if (snmp_rules[i].len > IFNAMSIZ) {
1455                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1456                                 exit(1);
1457                         }
1458                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1459                         if (!*(rule - 1)) *(rule - 1) = ',';
1460                         rule = strtok(NULL, ",");
1461                         if (!rule) {
1462                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1463                                 exit(1);
1464                         }
1465                         snmp_rules[i].base = atoi(rule);
1466                         *(rule - 1) = ':';
1467                         rule = strtok(NULL, ":");
1468                 }
1469                 nsnmp_rules = i;
1470         }
1471         if (parms[tflag].count)
1472                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1473         if (parms[aflag].count) {
1474                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1475 bad_lhost:
1476                         fprintf(stderr, "Illegal %s\n", "source address");
1477                         exit(1);
1478                 } else {
1479                         saddr = *((struct sockaddr_in *) res->ai_addr);
1480                         freeaddrinfo(res);
1481                 }
1482         }
1483         if (parms[uflag].count) 
1484                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1485                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1486                         exit(1);
1487                 }
1488
1489
1490         /* Process collectors parameters. Brrrr... :-[ */
1491
1492         npeers = argc - optind;
1493         if (npeers > 1) {
1494                 /* Send to remote Netflow collector */
1495                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1496                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1497                         dhost = argv[i];
1498                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1499                         *dport++ = 0;
1500                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1501                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1502                                 exit(1);
1503                         }
1504                         peers[npeers].write_fd = write_fd;
1505                         peers[npeers].type = PEER_MIRROR;
1506                         peers[npeers].laddr = saddr;
1507                         peers[npeers].seq = 0;
1508                         if ((lhost = strchr(dport, '/'))) {
1509                                 *lhost++ = 0;
1510                                 if ((type = strchr(lhost, '/'))) {
1511                                         *type++ = 0;
1512                                         switch (*type) {
1513                                                 case 0:
1514                                                 case 'm':
1515                                                         break;
1516
1517                                                 case 'r':
1518                                                         peers[npeers].type = PEER_ROTATE;
1519                                                         npeers_rot++;
1520                                                         break;
1521
1522                                                 default:
1523                                                         goto bad_collector;
1524                                         }
1525                                 }
1526                                 if (*lhost) {
1527                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1528                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1529                                         freeaddrinfo(res);
1530                                 }
1531                         }
1532                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1533                                                 sizeof(struct sockaddr_in))) {
1534                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1535                                 exit(1);
1536                         }
1537                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1538 bad_collector:
1539                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1540                                 exit(1);
1541                         }
1542                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1543                         freeaddrinfo(res);
1544                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1545                                                 sizeof(struct sockaddr_in))) {
1546                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1547                                 exit(1);
1548                         }
1549
1550                         /* Restore command line */
1551                         if (type) *--type = '/';
1552                         if (lhost) *--lhost = '/';
1553                         *--dport = ':';
1554                 }
1555         }
1556         else if (parms[fflag].count) {
1557                 // log into a file
1558                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1559                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1560                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1561
1562                 peers[npeers].write_fd = START_DATA_FD;
1563                 peers[npeers].type = PEER_FILE;
1564                 peers[npeers].seq = 0;
1565
1566                 read_cur_epoch();
1567                 npeers++;
1568         }
1569         else 
1570                 usage();
1571
1572
1573         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1574         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1575         if (!ulog_handle) {
1576                 fprintf(stderr, "libipulog initialization error: %s",
1577                                 ipulog_strerror(ipulog_errno));
1578                 exit(1);
1579         }
1580         if (sockbufsize)
1581                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1582                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1583                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1584
1585         /* Daemonize (if log destination stdout-free) */
1586
1587         my_log_open(ident, verbosity, log_dest);
1588
1589         init_daemon();
1590
1591         if (!(log_dest & 2)) {
1592                 /* Crash-proofing - Sapan*/
1593                 while (1) {
1594                         int pid=fork();
1595                         if (pid==-1) {
1596                                 fprintf(stderr, "fork(): %s", strerror(errno));
1597                                 exit(1);
1598                         }
1599                         else if (pid==0) {
1600                                 setsid();
1601                                 freopen("/dev/null", "r", stdin);
1602                                 freopen("/dev/null", "w", stdout);
1603                                 freopen("/dev/null", "w", stderr);
1604                                 break;
1605                         }
1606                         else {
1607                                 while (wait3(NULL,0,NULL) < 1);
1608                         }
1609                 }
1610         } else {
1611                 setvbuf(stdout, (char *)0, _IONBF, 0);
1612                 setvbuf(stderr, (char *)0, _IONBF, 0);
1613         }
1614
1615         pid = getpid();
1616         sprintf(errpbuf, "[%ld]", (long) pid);
1617         strcat(ident, errpbuf);
1618
1619         /* Initialization */
1620
1621         hash_init(); /* Actually for crc16 only */
1622         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1623         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1624
1625 #ifdef UPTIME_TRICK
1626         /* Hope 12 days is enough :-/ */
1627         start_time_offset = 1 << 20;
1628
1629         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1630 #endif
1631         gettime(&start_time);
1632
1633         /*
1634            Build static pending queue as circular buffer.
1635            */
1636         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1637         pending_tail = pending_head;
1638         for (i = pending_queue_length - 1; i--;) {
1639                 if (!(pending_tail->next = mem_alloc())) {
1640 err_mem_alloc:
1641                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1642                         exit(1);
1643                 }
1644                 pending_tail = pending_tail->next;
1645         }
1646         pending_tail->next = pending_head;
1647         pending_tail = pending_head;
1648
1649         sigemptyset(&sig_mask);
1650         sigact.sa_handler = &sighandler;
1651         sigact.sa_mask = sig_mask;
1652         sigact.sa_flags = 0;
1653         sigaddset(&sig_mask, SIGTERM);
1654         sigaction(SIGTERM, &sigact, 0);
1655 #if ((DEBUG) & DEBUG_I)
1656         sigaddset(&sig_mask, SIGUSR1);
1657         sigaction(SIGUSR1, &sigact, 0);
1658 #endif
1659         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1660                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1661                 exit(1);
1662         }
1663
1664         my_log(LOG_INFO, "Starting %s...", VERSION);
1665
1666         if (parms[cflag].count) {
1667                 if (chdir(parms[cflag].arg) || chroot(".")) {
1668                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1669                         exit(1);
1670                 }
1671         }
1672
1673         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1674         pthread_attr_init(&tattr);
1675         for (i = 0; i < THREADS - 1; i++) {
1676                 if (schedp.sched_priority > 0) {
1677                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1678                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1679                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1680                                 exit(1);
1681                         }
1682                 }
1683                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1684                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1685                         exit(1);
1686                 }
1687                 pthread_detach(thid);
1688                 schedp.sched_priority++;
1689         }
1690
1691         if (pw) {
1692                 if (setgroups(0, NULL)) {
1693                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1694                         exit(1);
1695                 }
1696                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1697                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1698                         exit(1);
1699                 }
1700                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1701                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1702                         exit(1);
1703                 }
1704         }
1705
1706         if (!(pidfile = fopen(pidfilepath, "w")))
1707                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1708         else {
1709                 fprintf(pidfile, "%ld\n", (long) pid);
1710                 fclose(pidfile);
1711         }
1712
1713         my_log(LOG_INFO, "pid: %d", pid);
1714         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1715                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1716                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1717                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1718                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1719                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1720                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1721         for (i = 0; i < nsnmp_rules; i++) {
1722                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1723                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1724         }
1725         for (i = 0; i < npeers; i++) {
1726                 switch (peers[i].type) {
1727                         case PEER_MIRROR:
1728                                 c = 'm';
1729                                 break;
1730                         case PEER_ROTATE:
1731                                 c = 'r';
1732                                 break;
1733                 }
1734                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1735                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1736                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1737         }
1738
1739         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1740
1741         timeout.tv_usec = 0;
1742         while (!killed
1743                         || (total_elements - free_elements - pending_queue_length)
1744                         || emit_count
1745                         || pending_tail->flags) {
1746
1747                 if (!sigs) {
1748                         timeout.tv_sec = scan_interval;
1749                         select(0, 0, 0, 0, &timeout);
1750                 }
1751
1752                 if (sigs & SIGTERM_MASK && !killed) {
1753                         sigs &= ~SIGTERM_MASK;
1754                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1755                         scan_interval = 1;
1756                         frag_lifetime = -1;
1757                         active_lifetime = -1;
1758                         inactive_lifetime = -1;
1759                         emit_timeout = 1;
1760                         unpending_timeout = 1;
1761                         killed = 1;
1762                         pthread_cond_signal(&scan_cond);
1763                         pthread_cond_signal(&unpending_cond);
1764                 }
1765
1766 #if ((DEBUG) & DEBUG_I)
1767                 if (sigs & SIGUSR1_MASK) {
1768                         sigs &= ~SIGUSR1_MASK;
1769                         info_debug();
1770                 }
1771 #endif
1772         }
1773         remove(pidfilepath);
1774 #if ((DEBUG) & DEBUG_I)
1775         info_debug();
1776 #endif
1777         my_log(LOG_INFO, "Done.");
1778 #ifdef WALL
1779         return 0;
1780 #endif
1781 }