Modify bits for old files also.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 #include "vserver.h"
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
223
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
231
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
235
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
239 static pid_t pid;
240 static int killed;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
248
249 void usage()
250 {
251         fprintf(stdout,
252                         "fprobe-ulog: a NetFlow probe. Version %s\n"
253                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
254                         "\n"
255                         "-h\t\tDisplay this help\n"
256                         "-U <mask>\tULOG group bitwise mask [1]\n"
257                         "-s <seconds>\tHow often scan for expired flows [5]\n"
258                         "-g <seconds>\tFragmented flow lifetime [30]\n"
259                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260                         "-f <filename>\tLog flow data in a file\n"
261                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263                         "-a <address>\tUse <address> as source for NetFlow flow\n"
264                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265                         "-M\t\tUse netfilter mark value as ToS flag\n"
266                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268                         "-q <flows>\tPending queue length [100]\n"
269                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
270                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272                         "-c <directory>\tDirectory to chroot to\n"
273                         "-u <user>\tUser to run as\n"
274                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276                         "-y <remote:port>\tAddress of the NetFlow collector\n"
277                         "-f <writable file>\tFile to write data into\n"
278                         "-T <n>\tRotate log file every n epochs\n"
279                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280                         "-E <[1..60]>\tSize of an epoch in minutes\n"
281                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
282                         ,
283                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
284         exit(0);
285 }
286
287 #if ((DEBUG) & DEBUG_I)
288 void info_debug()
289 {
290         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291                         pkts_total, pkts_total_fragmented, size_total,
292                         pkts_pending - pkts_pending_done, pending_queue_trace);
293         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298                         total_elements, free_elements, total_memory);
299 }
300 #endif
301
302 void sighandler(int sig)
303 {
304         switch (sig) {
305                 case SIGTERM:
306                         sigs |= SIGTERM_MASK;
307                         break;
308 #if ((DEBUG) & DEBUG_I)
309                 case SIGUSR1:
310                         sigs |= SIGUSR1_MASK;
311                         break;
312 #endif
313         }
314 }
315
316 void gettime(struct Time *now)
317 {
318         struct timeval t;
319
320         gettimeofday(&t, 0);
321         now->sec = t.tv_sec;
322         now->usec = t.tv_usec;
323 }
324
325
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
327 {
328         return (t1->sec - t2->sec)/60;
329 }
330
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
332 {
333         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
334 }
335
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
338 {
339         /* Maximum uptime is about 49/2 days */
340         return cmpmtime(t, &start_time);
341 }
342
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
345 {
346         /* Maximum uptime is about 49/2 days */
347         return cmpMtime(t, &start_time);
348 }
349
350 hash_t hash_flow(struct Flow *flow)
351 {
352         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353         else return hash(flow, sizeof(struct Flow_TL));
354 }
355
356 uint16_t snmp_index(char *name) {
357         uint32_t i;
358
359         if (!*name) return 0;
360
361         for (i = 0; (int) i < nsnmp_rules; i++) {
362                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
364         }
365
366         if ((i = if_nametoindex(name))) return i;
367
368         return -1;
369 }
370
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
372 {
373         dst->iif = src->iif;
374         dst->oif = src->oif;
375         dst->sip = src->sip;
376         dst->dip = src->dip;
377         dst->tos = src->tos;
378         dst->xid = src->xid;
379         dst->proto = src->proto;
380         dst->tcp_flags = src->tcp_flags;
381         dst->id = src->id;
382         dst->sp = src->sp;
383         dst->dp = src->dp;
384         dst->pkts = src->pkts;
385         dst->size = src->size;
386         dst->sizeF = src->sizeF;
387         dst->sizeP = src->sizeP;
388         dst->ctime = src->ctime;
389         dst->mtime = src->mtime;
390         dst->flags = src->flags;
391 }
392
393 void read_cur_epoch() {
394         int fd;
395         /* Reset to -1 in case the read fails */
396         cur_epoch=-1;
397         fd = open(LAST_EPOCH_FILE, O_RDONLY);
398         if (fd != -1) {
399                 char snum[MAX_EPOCH_SIZE];
400                 ssize_t len;
401                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
402                 if (len != -1) {
403                         snum[len]='\0';
404                         sscanf(snum,"%d",&cur_epoch);
405                         cur_epoch++; /* Let's not stone the last epoch */
406                         close(fd);
407                 }
408         }
409         return;
410 }
411
412
413 /* Dumps the current epoch in a file to cope with
414  * reboots and killings of fprobe */
415
416 void update_cur_epoch_file(int n) {
417         int fd, len;
418         char snum[MAX_EPOCH_SIZE];
419         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420         fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
421         if (fd == -1) {
422                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
423                 return;
424         }
425         write(fd, snum, len);
426         close(fd);
427 }
428
429 /* Get the file descriptor corresponding to the current file.
430  * The kludgy implementation is to abstract away the 'current
431  * file descriptor', which may also be a socket. 
432  */
433
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
435         struct Time now;
436         unsigned cur_uptime;
437
438         struct statfs statfs;
439         int ret_fd;
440
441         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442          * doesn't solve the problem */
443         gettime(&now);
444         cur_uptime = getuptime_minutes(&now);
445
446         if (cur_fd != START_DATA_FD) {
447                 if (fstatfs(cur_fd, &statfs) == -1) {
448                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
449                 }
450                 else {
451                         if (min_free && (statfs.f_bavail < min_free)
452                                         && (cur_epoch==last_peak))
453                         {
454                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
455                                 cur_epoch = -1;
456                         }
457                         /*
458                            else 
459                            assume that we can reclaim space by overwriting our own files
460                            and that the difference in size will not fill the disk - sapan
461                            */
462                 }
463         }
464
465         /* If epoch length has been exceeded, 
466          * or we're starting up 
467          * or we're going back to the first epoch */
468         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469                 char nextname[MAX_PATH_LEN];
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0)
475                         close(cur_fd);
476                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477                 if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
478                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
479                         exit(1);
480                 }
481                 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
482                         my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno));
483                 }
484                 update_cur_epoch_file(cur_epoch);
485                 ret_fd = write_fd;
486         }
487         else {
488                 ret_fd = cur_fd;
489         }
490         return(ret_fd);
491 }
492
493 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
494 {
495         struct Flow **flowpp;
496
497 #ifdef WALL
498         flowpp = 0;
499 #endif
500
501         if (prev) flowpp = *prev;
502
503         while (where) {
504                 if (where->sip.s_addr == what->sip.s_addr
505                                 && where->dip.s_addr == what->dip.s_addr
506                                 && where->proto == what->proto) {
507                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
508                                 case 0:
509                                         /* Both unfragmented */
510                                         if ((what->sp == where->sp)
511                                                         && (what->dp == where->dp)) goto done;
512                                         break;
513                                 case 2:
514                                         /* Both fragmented */
515                                         if (where->id == what->id) goto done;
516                                         break;
517                         }
518                 }
519                 flowpp = &where->next;
520                 where = where->next;
521         }
522 done:
523         if (prev) *prev = flowpp;
524         return where;
525 }
526
527         int put_into(struct Flow *flow, int flag
528 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
529                         , char *logbuf
530 #endif
531                     )
532 {
533         int ret = 0;
534         hash_t h;
535         struct Flow *flown, **flowpp;
536 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
537         char buf[64];
538 #endif
539
540         h = hash_flow(flow);
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
543         strcat(logbuf, buf);
544 #endif
545         pthread_mutex_lock(&flows_mutex[h]);
546         flowpp = &flows[h];
547         if (!(flown = find(flows[h], flow, &flowpp))) {
548                 /* No suitable flow found - add */
549                 if (flag == COPY_INTO) {
550                         if ((flown = mem_alloc())) {
551                                 copy_flow(flow, flown);
552                                 flow = flown;
553                         } else {
554 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
555                                 my_log(LOG_ERR, "%s %s. %s",
556                                                 "mem_alloc():", strerror(errno), "packet lost");
557 #endif
558                                 return -1;
559                         }
560                 }
561                 flow->next = flows[h];
562                 flows[h] = flow;
563 #if ((DEBUG) & DEBUG_I)
564                 flows_total++;
565                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
566 #endif
567 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
568                 if (flown) {
569                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
570                         strcat(logbuf, buf);
571                 }
572 #endif
573         } else {
574                 /* Found suitable flow - update */
575 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
576                 sprintf(buf, " +> %x", (unsigned) flown);
577                 strcat(logbuf, buf);
578 #endif
579                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
580                         flown->mtime = flow->mtime;
581                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
582                         flown->ctime = flow->ctime;
583                 flown->tcp_flags |= flow->tcp_flags;
584                 flown->size += flow->size;
585                 flown->pkts += flow->pkts;
586                 if (flow->flags & FLOW_FRAG) {
587                         /* Fragmented flow require some additional work */
588                         if (flow->flags & FLOW_TL) {
589                                 /*
590                                    ?FIXME?
591                                    Several packets with FLOW_TL (attack)
592                                    */
593                                 flown->sp = flow->sp;
594                                 flown->dp = flow->dp;
595                         }
596                         if (flow->flags & FLOW_LASTFRAG) {
597                                 /*
598                                    ?FIXME?
599                                    Several packets with FLOW_LASTFRAG (attack)
600                                    */
601                                 flown->sizeP = flow->sizeP;
602                         }
603                         flown->flags |= flow->flags;
604                         flown->sizeF += flow->sizeF;
605                         if ((flown->flags & FLOW_LASTFRAG)
606                                         && (flown->sizeF >= flown->sizeP)) {
607                                 /* All fragments received - flow reassembled */
608                                 *flowpp = flown->next;
609                                 pthread_mutex_unlock(&flows_mutex[h]);
610 #if ((DEBUG) & DEBUG_I)
611                                 flows_total--;
612                                 flows_fragmented--;
613 #endif
614                                 flown->id = 0;
615                                 flown->flags &= ~FLOW_FRAG;
616 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
617                                 strcat(logbuf," R");
618 #endif
619                                 ret = put_into(flown, MOVE_INTO
620 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
621                                                 , logbuf
622 #endif
623                                               );
624                         }
625                 }
626                 if (flag == MOVE_INTO) mem_free(flow);
627         }
628         pthread_mutex_unlock(&flows_mutex[h]);
629         return ret;
630 }
631
632 int onlyonce=0;
633
634 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
635 {
636         int i;
637
638         for (i = 0; i < fields; i++) {
639 #if ((DEBUG) & DEBUG_F)
640                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
641 #endif
642                 switch (format[i]) {
643                         case NETFLOW_IPV4_SRC_ADDR:
644                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
645                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
646                                 break;
647
648                         case NETFLOW_IPV4_DST_ADDR:
649                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
650                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
651                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
652                                 }
653                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
654                                 break;
655
656                         case NETFLOW_INPUT_SNMP:
657                                 *((uint16_t *) p) = htons(flow->iif);
658                                 p += NETFLOW_INPUT_SNMP_SIZE;
659                                 break;
660
661                         case NETFLOW_OUTPUT_SNMP:
662                                 *((uint16_t *) p) = htons(flow->oif);
663                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
664                                 break;
665
666                         case NETFLOW_PKTS_32:
667                                 *((uint32_t *) p) = htonl(flow->pkts);
668                                 p += NETFLOW_PKTS_32_SIZE;
669                                 break;
670
671                         case NETFLOW_BYTES_32:
672                                 *((uint32_t *) p) = htonl(flow->size);
673                                 p += NETFLOW_BYTES_32_SIZE;
674                                 break;
675
676                         case NETFLOW_FIRST_SWITCHED:
677                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
678                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
679                                 break;
680
681                         case NETFLOW_LAST_SWITCHED:
682                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
683                                 p += NETFLOW_LAST_SWITCHED_SIZE;
684                                 break;
685
686                         case NETFLOW_L4_SRC_PORT:
687                                 *((uint16_t *) p) = flow->sp;
688                                 p += NETFLOW_L4_SRC_PORT_SIZE;
689                                 break;
690
691                         case NETFLOW_L4_DST_PORT:
692                                 *((uint16_t *) p) = flow->dp;
693                                 p += NETFLOW_L4_DST_PORT_SIZE;
694                                 break;
695
696                         case NETFLOW_PROT:
697                                 *((uint8_t *) p) = flow->proto;
698                                 p += NETFLOW_PROT_SIZE;
699                                 break;
700
701                         case NETFLOW_SRC_TOS:
702                                 *((uint8_t *) p) = flow->tos;
703                                 p += NETFLOW_SRC_TOS_SIZE;
704                                 break;
705
706                         case NETFLOW_TCP_FLAGS:
707                                 *((uint8_t *) p) = flow->tcp_flags;
708                                 p += NETFLOW_TCP_FLAGS_SIZE;
709                                 break;
710
711                         case NETFLOW_VERSION:
712                                 *((uint16_t *) p) = htons(netflow->Version);
713                                 p += NETFLOW_VERSION_SIZE;
714                                 break;
715
716                         case NETFLOW_COUNT:
717                                 *((uint16_t *) p) = htons(emit_count);
718                                 p += NETFLOW_COUNT_SIZE;
719                                 break;
720
721                         case NETFLOW_UPTIME:
722                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
723                                 p += NETFLOW_UPTIME_SIZE;
724                                 break;
725
726                         case NETFLOW_UNIX_SECS:
727                                 *((uint32_t *) p) = htonl(emit_time.sec);
728                                 p += NETFLOW_UNIX_SECS_SIZE;
729                                 break;
730
731                         case NETFLOW_UNIX_NSECS:
732                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
733                                 p += NETFLOW_UNIX_NSECS_SIZE;
734                                 break;
735
736                         case NETFLOW_FLOW_SEQUENCE:
737                                 //*((uint32_t *) p) = htonl(emit_sequence);
738                                 *((uint32_t *) p) = 0;
739                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
740                                 break;
741
742                         case NETFLOW_PAD8:
743                                 /* Unsupported (uint8_t) */
744                         case NETFLOW_ENGINE_TYPE:
745                         case NETFLOW_ENGINE_ID:
746                         case NETFLOW_FLAGS7_1:
747                         case NETFLOW_SRC_MASK:
748                         case NETFLOW_DST_MASK:
749                                 if (onlyonce) {
750                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
751                                         onlyonce=1;
752                                 }
753                                 *((uint8_t *) p) = 0;
754                                 p += NETFLOW_PAD8_SIZE;
755                                 break;
756                         case NETFLOW_XID:
757                                 *((uint32_t *) p) = flow->xid;
758                                 p += NETFLOW_XID_SIZE;
759                                 break;
760                         case NETFLOW_PAD16:
761                                 /* Unsupported (uint16_t) */
762                         case NETFLOW_SRC_AS:
763                         case NETFLOW_DST_AS:
764                         case NETFLOW_FLAGS7_2:
765                                 *((uint16_t *) p) = 0;
766                                 p += NETFLOW_PAD16_SIZE;
767                                 break;
768
769                         case NETFLOW_PAD32:
770                                 /* Unsupported (uint32_t) */
771                         case NETFLOW_IPV4_NEXT_HOP:
772                         case NETFLOW_ROUTER_SC:
773                                 *((uint32_t *) p) = 0;
774                                 p += NETFLOW_PAD32_SIZE;
775                                 break;
776
777                         default:
778                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
779                                                 format, i, format[i]);
780                                 exit(1);
781                 }
782         }
783 #if ((DEBUG) & DEBUG_F)
784         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
785 #endif
786         return p;
787 }
788
789 void setuser() {
790         /*
791            Workaround for clone()-based threads
792            Try to change EUID independently of main thread
793            */
794         if (pw) {
795                 setgroups(0, NULL);
796                 setregid(pw->pw_gid, pw->pw_gid);
797                 setreuid(pw->pw_uid, pw->pw_uid);
798         }
799 }
800
801 void *emit_thread()
802 {
803         struct Flow *flow;
804         void *p;
805         struct timeval now;
806         struct timespec timeout;
807         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
808
809         p = (void *) &emit_packet + netflow->HeaderSize;
810         timeout.tv_nsec = 0;
811
812         setuser();
813
814         for (;;) {
815                 pthread_mutex_lock(&emit_mutex);
816                 while (!flows_emit) {
817                         gettimeofday(&now, 0);
818                         timeout.tv_sec = now.tv_sec + emit_timeout;
819                         /* Do not wait until emit_packet will filled - it may be too long */
820                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
821                                 pthread_mutex_unlock(&emit_mutex);
822                                 goto sendit;
823                         }
824                 }
825                 flow = flows_emit;
826                 flows_emit = flows_emit->next;
827 #if ((DEBUG) & DEBUG_I)
828                 emit_queue--;
829 #endif          
830                 pthread_mutex_unlock(&emit_mutex);
831
832 #ifdef UPTIME_TRICK
833                 if (!emit_count) {
834                         gettime(&start_time);
835                         start_time.sec -= start_time_offset;
836                 }
837 #endif
838                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
839                 mem_free(flow);
840                 emit_count++;
841 #ifdef PF2_DEBUG
842                 printf("Emit count = %d\n", emit_count);
843                 fflush(stdout);
844 #endif
845                 if (emit_count == netflow->MaxFlows) {
846 sendit:
847                         gettime(&emit_time);
848                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
849                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
850                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
851 #ifdef STD_NETFLOW_PDU
852                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
853 #endif
854                         peer_rot_cur = 0;
855                         for (i = 0; i < npeers; i++) {
856                                 if (peers[i].type == PEER_FILE) {
857                                         if (netflow->SeqOffset)
858                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
859                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
860                                         ret = write(peers[i].write_fd, emit_packet, size);
861                                         if (ret < size) {
862
863 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
864                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
865                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
866 #endif
867 #undef MESSAGES
868                                         }
869 #if ((DEBUG) & DEBUG_E)
870                                         commaneelse {
871                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
872                                                                 emit_count, i + 1, peers[i].seq);
873                                         }
874 #endif
875                                         peers[i].seq += emit_count;
876
877                                         /* Rate limit */
878                                         if (emit_rate_bytes) {
879                                                 sent += size;
880                                                 delay = sent / emit_rate_bytes;
881                                                 if (delay) {
882                                                         sent %= emit_rate_bytes;
883                                                         timeout.tv_sec = 0;
884                                                         timeout.tv_nsec = emit_rate_delay * delay;
885                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
886                                                 }
887                                         }
888                                 }
889                                 else
890                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
891                                         else
892                                                 if (peers[i].type == PEER_ROTATE) 
893                                                         if (peer_rot_cur++ == peer_rot_work) {
894 sendreal:
895                                                                 if (netflow->SeqOffset)
896                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
897                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
898                                                                 if (ret < size) {
899 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
900                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
901                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
902 #endif
903                                                                 }
904 #if ((DEBUG) & DEBUG_E)
905                                                                 commaneelse {
906                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
907                                                                                         emit_count, i + 1, peers[i].seq);
908                                                                 }
909 #endif
910                                                                 peers[i].seq += emit_count;
911
912                                                                 /* Rate limit */
913                                                                 if (emit_rate_bytes) {
914                                                                         sent += size;
915                                                                         delay = sent / emit_rate_bytes;
916                                                                         if (delay) {
917                                                                                 sent %= emit_rate_bytes;
918                                                                                 timeout.tv_sec = 0;
919                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
920                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
921                                                                         }
922                                                                 }
923                                                         }
924                         }
925                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
926                         emit_sequence += emit_count;
927                         emit_count = 0;
928 #if ((DEBUG) & DEBUG_I)
929                         emit_pkts++;
930 #endif
931                 }
932         }
933 }       
934
935 void *unpending_thread()
936 {
937         struct timeval now;
938         struct timespec timeout;
939 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
940         char logbuf[256];
941 #endif
942
943         setuser();
944
945         timeout.tv_nsec = 0;
946         pthread_mutex_lock(&unpending_mutex);
947
948         for (;;) {
949                 while (!(pending_tail->flags & FLOW_PENDING)) {
950                         gettimeofday(&now, 0);
951                         timeout.tv_sec = now.tv_sec + unpending_timeout;
952                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
953                 }
954
955 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
956                 *logbuf = 0;
957 #endif
958                 if (put_into(pending_tail, COPY_INTO
959 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
960                                         , logbuf
961 #endif
962                             ) < 0) {
963 #if ((DEBUG) & DEBUG_I)
964                         pkts_lost_unpending++;
965 #endif                          
966                 }
967
968 #if ((DEBUG) & DEBUG_U)
969                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
970 #endif
971
972                 pending_tail->flags = 0;
973                 pending_tail = pending_tail->next;
974 #if ((DEBUG) & DEBUG_I)
975                 pkts_pending_done++;
976 #endif
977         }
978 }
979
980 void *scan_thread()
981 {
982 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
983         char logbuf[256];
984 #endif
985         int i;
986         struct Flow *flow, **flowpp;
987         struct Time now;
988         struct timespec timeout;
989
990         setuser();
991
992         timeout.tv_nsec = 0;
993         pthread_mutex_lock(&scan_mutex);
994
995         for (;;) {
996                 gettime(&now);
997                 timeout.tv_sec = now.sec + scan_interval;
998                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
999
1000                 gettime(&now);
1001 #if ((DEBUG) & DEBUG_S)
1002                 my_log(LOG_DEBUG, "S: %d", now.sec);
1003 #endif
1004                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1005                         pthread_mutex_lock(&flows_mutex[i]);
1006                         flow = flows[i];
1007                         flowpp = &flows[i];
1008                         while (flow) {
1009                                 if (flow->flags & FLOW_FRAG) {
1010                                         /* Process fragmented flow */
1011                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1012                                                 /* Fragmented flow expired - put it into special chain */
1013 #if ((DEBUG) & DEBUG_I)
1014                                                 flows_fragmented--;
1015                                                 flows_total--;
1016 #endif
1017                                                 *flowpp = flow->next;
1018                                                 flow->id = 0;
1019                                                 flow->flags &= ~FLOW_FRAG;
1020                                                 flow->next = scan_frag_dreg;
1021                                                 scan_frag_dreg = flow;
1022                                                 flow = *flowpp;
1023                                                 continue;
1024                                         }
1025                                 } else {
1026                                         /* Flow is not frgamented */
1027                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1028                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1029                                                 /* Flow expired */
1030 #if ((DEBUG) & DEBUG_S)
1031                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1032 #endif
1033 #if ((DEBUG) & DEBUG_I)
1034                                                 flows_total--;
1035 #endif
1036                                                 *flowpp = flow->next;
1037                                                 pthread_mutex_lock(&emit_mutex);
1038                                                 flow->next = flows_emit;
1039                                                 flows_emit = flow;
1040 #if ((DEBUG) & DEBUG_I)
1041                                                 emit_queue++;
1042 #endif                          
1043                                                 pthread_mutex_unlock(&emit_mutex);
1044                                                 flow = *flowpp;
1045                                                 continue;
1046                                         }
1047                                 }
1048                                 flowpp = &flow->next;
1049                                 flow = flow->next;
1050                         } /* chain loop */
1051                         pthread_mutex_unlock(&flows_mutex[i]);
1052                 } /* hash loop */
1053                 if (flows_emit) pthread_cond_signal(&emit_cond);
1054
1055                 while (scan_frag_dreg) {
1056                         flow = scan_frag_dreg;
1057                         scan_frag_dreg = flow->next;
1058 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1059                         *logbuf = 0;
1060 #endif
1061                         put_into(flow, MOVE_INTO
1062 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1063                                         , logbuf
1064 #endif
1065                                 );
1066 #if ((DEBUG) & DEBUG_S)
1067                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1068 #endif
1069                 }
1070         }
1071 }
1072
1073 void *cap_thread()
1074 {
1075         struct ulog_packet_msg *ulog_msg;
1076         struct ip *nl;
1077         void *tl;
1078         struct Flow *flow;
1079         int len, off_frag, psize;
1080 #if ((DEBUG) & DEBUG_C)
1081         char buf[64];
1082         char logbuf[256];
1083 #endif
1084         int challenge;
1085
1086         setuser();
1087
1088         while (!killed) {
1089                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1090                 if (len <= 0) {
1091                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1092                         continue;
1093                 }
1094                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1095
1096 #if ((DEBUG) & DEBUG_C)
1097                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1098 #endif
1099
1100                         nl = (void *) &ulog_msg->payload;
1101                         psize = ulog_msg->data_len;
1102
1103                         /* Sanity check */
1104                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1105 #if ((DEBUG) & DEBUG_C)
1106                                 strcat(logbuf, " U");
1107                                 my_log(LOG_DEBUG, "%s", logbuf);
1108 #endif
1109 #if ((DEBUG) & DEBUG_I)
1110                                 pkts_ignored++;
1111 #endif
1112                                 continue;
1113                         }
1114
1115                         if (pending_head->flags) {
1116 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1117                                 my_log(LOG_ERR,
1118 # if ((DEBUG) & DEBUG_C)
1119                                                 "%s %s %s", logbuf,
1120 # else
1121                                                 "%s %s",
1122 # endif
1123                                                 "pending queue full:", "packet lost");
1124 #endif
1125 #if ((DEBUG) & DEBUG_I)
1126                                 pkts_lost_capture++;
1127 #endif
1128                                 goto done;
1129                         }
1130
1131 #if ((DEBUG) & DEBUG_I)
1132                         pkts_total++;
1133 #endif
1134
1135                         flow = pending_head;
1136
1137                         /* ?FIXME? Add sanity check for ip_len? */
1138                         flow->size = ntohs(nl->ip_len);
1139 #if ((DEBUG) & DEBUG_I)
1140                         size_total += flow->size;
1141 #endif
1142
1143                         flow->sip = nl->ip_src;
1144                         flow->dip = nl->ip_dst;
1145                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1146                         
1147                         /* It's going to be expensive calling this syscall on every flow.
1148                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1149                         if (ulog_msg->mark > 0) {
1150                                 flow->xid = get_vhi_name(ulog_msg->mark);
1151                                 challenge = get_vhi_name(ulog_msg->mark);
1152                         }
1153
1154                         if (flow->xid < 1 || flow->xid!=challenge) 
1155                                 flow->xid = ulog_msg->mark;
1156
1157
1158                         if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1159                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid);
1160                         }
1161                         flow->iif = snmp_index(ulog_msg->indev_name);
1162                         flow->oif = snmp_index(ulog_msg->outdev_name);
1163                         flow->proto = nl->ip_p;
1164                         flow->id = 0;
1165                         flow->tcp_flags = 0;
1166                         flow->pkts = 1;
1167                         flow->sizeF = 0;
1168                         flow->sizeP = 0;
1169                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1170                         if (ulog_msg->timestamp_sec) {
1171                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1172                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1173                         } else gettime(&flow->ctime);
1174                         flow->mtime = flow->ctime;
1175
1176                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1177
1178                         /*
1179                            Offset (from network layer) to transport layer header/IP data
1180                            IOW IP header size ;-)
1181
1182                            ?FIXME?
1183                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1184                            */
1185                         off_tl = nl->ip_hl << 2;
1186                         tl = (void *) nl + off_tl;
1187
1188                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1189                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1190                         psize -= off_tl;
1191                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1192                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1193
1194                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1195                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1196 #if ((DEBUG) & DEBUG_C)
1197                                 strcat(logbuf, " F");
1198 #endif
1199 #if ((DEBUG) & DEBUG_I)
1200                                 pkts_total_fragmented++;
1201 #endif
1202                                 flow->flags |= FLOW_FRAG;
1203                                 flow->id = nl->ip_id;
1204
1205                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1206                                         /* Packet whith IP_MF contains information about whole datagram size */
1207                                         flow->flags |= FLOW_LASTFRAG;
1208                                         /* size = frag_offset*8 + data_size */
1209                                         flow->sizeP = off_frag + flow->sizeF;
1210                                 }
1211                         }
1212
1213 #if ((DEBUG) & DEBUG_C)
1214                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1215                         strcat(logbuf, buf);
1216                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1217                         strcat(logbuf, buf);
1218 #endif
1219
1220                         /*
1221                            Fortunately most interesting transport layer information fit
1222                            into first 8 bytes of IP data field (minimal nonzero size).
1223                            Thus we don't need actual packet reassembling to build whole
1224                            transport layer data. We only check the fragment offset for
1225                            zero value to find packet with this information.
1226                            */
1227                         if (!off_frag && psize >= 8) {
1228                                 switch (flow->proto) {
1229                                         case IPPROTO_TCP:
1230                                         case IPPROTO_UDP:
1231                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1232                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1233                                                 goto tl_known;
1234
1235 #ifdef ICMP_TRICK
1236                                         case IPPROTO_ICMP:
1237                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1238                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1239                                                 goto tl_known;
1240 #endif
1241 #ifdef ICMP_TRICK_CISCO
1242                                         case IPPROTO_ICMP:
1243                                                 flow->dp = *((int32_t *) tl);
1244                                                 goto tl_known;
1245 #endif
1246
1247                                         default:
1248                                                 /* Unknown transport layer */
1249 #if ((DEBUG) & DEBUG_C)
1250                                                 strcat(logbuf, " U");
1251 #endif
1252                                                 flow->sp = 0;
1253                                                 flow->dp = 0;
1254                                                 break;
1255
1256 tl_known:
1257 #if ((DEBUG) & DEBUG_C)
1258                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1259                                                 strcat(logbuf, buf);
1260 #endif
1261                                                 flow->flags |= FLOW_TL;
1262                                 }
1263                         }
1264
1265                         /* Check for tcp flags presence (including CWR and ECE). */
1266                         if (flow->proto == IPPROTO_TCP
1267                                         && off_frag < 16
1268                                         && psize >= 16 - off_frag) {
1269                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1270 #if ((DEBUG) & DEBUG_C)
1271                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1272                                 strcat(logbuf, buf);
1273 #endif
1274                         }
1275
1276 #if ((DEBUG) & DEBUG_C)
1277                         sprintf(buf, " => %x", (unsigned) flow);
1278                         strcat(logbuf, buf);
1279                         my_log(LOG_DEBUG, "%s", logbuf);
1280 #endif
1281
1282 #if ((DEBUG) & DEBUG_I)
1283                         pkts_pending++;
1284                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1285                         if (pending_queue_trace < pending_queue_trace_candidate)
1286                                 pending_queue_trace = pending_queue_trace_candidate;
1287 #endif
1288
1289                         /* Flow complete - inform unpending_thread() about it */
1290                         pending_head->flags |= FLOW_PENDING;
1291                         pending_head = pending_head->next;
1292 done:
1293                         pthread_cond_signal(&unpending_cond);
1294                 }
1295         }
1296         return 0;
1297 }
1298
1299 /* Copied out of CoDemux */
1300
1301 static int init_daemon() {
1302         pid_t pid;
1303         FILE *pidfile;
1304
1305         pidfile = fopen(PIDFILE, "w");
1306         if (pidfile == NULL) {
1307                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1308         }
1309
1310         if ((pid = fork()) < 0) {
1311                 fclose(pidfile);
1312                 my_log(LOG_ERR, "Could not fork!\n");
1313                 return(-1);
1314         }
1315         else if (pid != 0) {
1316                 /* i'm the parent, writing down the child pid  */
1317                 fprintf(pidfile, "%u\n", pid);
1318                 fclose(pidfile);
1319                 exit(0);
1320         }
1321
1322         /* close the pid file */
1323         fclose(pidfile);
1324
1325         /* routines for any daemon process
1326            1. create a new session 
1327            2. change directory to the root
1328            3. change the file creation permission 
1329            */
1330         setsid();
1331         chdir("/var/local/fprobe");
1332         umask(0);
1333
1334         return(0);
1335 }
1336
1337 int main(int argc, char **argv)
1338 {
1339         char errpbuf[512];
1340         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1341         int c, i, write_fd, memory_limit = 0;
1342         struct addrinfo hints, *res;
1343         struct sockaddr_in saddr;
1344         pthread_attr_t tattr;
1345         struct sigaction sigact;
1346         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1347         struct timeval timeout;
1348
1349         sched_min = sched_get_priority_min(SCHED);
1350         sched_max = sched_get_priority_max(SCHED);
1351
1352         memset(&saddr, 0 , sizeof(saddr));
1353         memset(&hints, 0 , sizeof(hints));
1354         hints.ai_flags = AI_PASSIVE;
1355         hints.ai_family = AF_INET;
1356         hints.ai_socktype = SOCK_DGRAM;
1357
1358         /* Process command line options */
1359
1360         opterr = 0;
1361         while ((c = my_getopt(argc, argv, parms)) != -1) {
1362                 switch (c) {
1363                         case '?':
1364                                 usage();
1365
1366                         case 'h':
1367                                 usage();
1368                 }
1369         }
1370
1371         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1372         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1373         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1374         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1375         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1376         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1377         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1378         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1379         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1380         if (parms[nflag].count) {
1381                 switch (atoi(parms[nflag].arg)) {
1382                         case 1:
1383                                 netflow = &NetFlow1;
1384                                 break;
1385
1386                         case 5:
1387                                 break;
1388
1389                         case 7:
1390                                 netflow = &NetFlow7;
1391                                 break;
1392
1393                         default:
1394                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1395                                 exit(1);
1396                 }
1397         }
1398         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1399         if (parms[lflag].count) {
1400                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1401                         *log_suffix++ = 0;
1402                         if (*log_suffix) {
1403                                 sprintf(errpbuf, "[%s]", log_suffix);
1404                                 strcat(ident, errpbuf);
1405                         }
1406                 }
1407                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1408                 if (log_suffix) *--log_suffix = ':';
1409         }
1410         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1411 err_malloc:
1412                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1413                 exit(1);
1414         }
1415         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1416         if (parms[qflag].count) {
1417                 pending_queue_length = atoi(parms[qflag].arg);
1418                 if (pending_queue_length < 1) {
1419                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1420                         exit(1);
1421                 }
1422         }
1423         if (parms[rflag].count) {
1424                 schedp.sched_priority = atoi(parms[rflag].arg);
1425                 if (schedp.sched_priority
1426                                 && (schedp.sched_priority < sched_min
1427                                         || schedp.sched_priority > sched_max)) {
1428                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1429                         exit(1);
1430                 }
1431         }
1432         if (parms[Bflag].count) {
1433                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1434         }
1435         if (parms[bflag].count) {
1436                 bulk_quantity = atoi(parms[bflag].arg);
1437                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1438                         fprintf(stderr, "Illegal %s\n", "bulk size");
1439                         exit(1);
1440                 }
1441         }
1442         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1443         if (parms[Xflag].count) {
1444                 for(i = 0; parms[Xflag].arg[i]; i++)
1445                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1446                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1447                         goto err_malloc;
1448                 rule = strtok(parms[Xflag].arg, ":");
1449                 for (i = 0; rule; i++) {
1450                         snmp_rules[i].len = strlen(rule);
1451                         if (snmp_rules[i].len > IFNAMSIZ) {
1452                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1453                                 exit(1);
1454                         }
1455                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1456                         if (!*(rule - 1)) *(rule - 1) = ',';
1457                         rule = strtok(NULL, ",");
1458                         if (!rule) {
1459                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1460                                 exit(1);
1461                         }
1462                         snmp_rules[i].base = atoi(rule);
1463                         *(rule - 1) = ':';
1464                         rule = strtok(NULL, ":");
1465                 }
1466                 nsnmp_rules = i;
1467         }
1468         if (parms[tflag].count)
1469                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1470         if (parms[aflag].count) {
1471                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1472 bad_lhost:
1473                         fprintf(stderr, "Illegal %s\n", "source address");
1474                         exit(1);
1475                 } else {
1476                         saddr = *((struct sockaddr_in *) res->ai_addr);
1477                         freeaddrinfo(res);
1478                 }
1479         }
1480         if (parms[uflag].count) 
1481                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1482                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1483                         exit(1);
1484                 }
1485
1486
1487         /* Process collectors parameters. Brrrr... :-[ */
1488
1489         npeers = argc - optind;
1490         if (npeers > 1) {
1491                 /* Send to remote Netflow collector */
1492                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1493                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1494                         dhost = argv[i];
1495                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1496                         *dport++ = 0;
1497                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1498                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1499                                 exit(1);
1500                         }
1501                         peers[npeers].write_fd = write_fd;
1502                         peers[npeers].type = PEER_MIRROR;
1503                         peers[npeers].laddr = saddr;
1504                         peers[npeers].seq = 0;
1505                         if ((lhost = strchr(dport, '/'))) {
1506                                 *lhost++ = 0;
1507                                 if ((type = strchr(lhost, '/'))) {
1508                                         *type++ = 0;
1509                                         switch (*type) {
1510                                                 case 0:
1511                                                 case 'm':
1512                                                         break;
1513
1514                                                 case 'r':
1515                                                         peers[npeers].type = PEER_ROTATE;
1516                                                         npeers_rot++;
1517                                                         break;
1518
1519                                                 default:
1520                                                         goto bad_collector;
1521                                         }
1522                                 }
1523                                 if (*lhost) {
1524                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1525                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1526                                         freeaddrinfo(res);
1527                                 }
1528                         }
1529                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1530                                                 sizeof(struct sockaddr_in))) {
1531                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1532                                 exit(1);
1533                         }
1534                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1535 bad_collector:
1536                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1537                                 exit(1);
1538                         }
1539                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1540                         freeaddrinfo(res);
1541                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1542                                                 sizeof(struct sockaddr_in))) {
1543                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1544                                 exit(1);
1545                         }
1546
1547                         /* Restore command line */
1548                         if (type) *--type = '/';
1549                         if (lhost) *--lhost = '/';
1550                         *--dport = ':';
1551                 }
1552         }
1553         else if (parms[fflag].count) {
1554                 // log into a file
1555                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1556                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1557                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1558
1559                 peers[npeers].write_fd = START_DATA_FD;
1560                 peers[npeers].type = PEER_FILE;
1561                 peers[npeers].seq = 0;
1562
1563                 read_cur_epoch();
1564                 npeers++;
1565         }
1566         else 
1567                 usage();
1568
1569
1570         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1571         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1572         if (!ulog_handle) {
1573                 fprintf(stderr, "libipulog initialization error: %s",
1574                                 ipulog_strerror(ipulog_errno));
1575                 exit(1);
1576         }
1577         if (sockbufsize)
1578                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1579                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1580                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1581
1582         /* Daemonize (if log destination stdout-free) */
1583
1584         my_log_open(ident, verbosity, log_dest);
1585
1586         init_daemon();
1587
1588         if (!(log_dest & 2)) {
1589                 /* Crash-proofing - Sapan*/
1590                 while (1) {
1591                         int pid=fork();
1592                         if (pid==-1) {
1593                                 fprintf(stderr, "fork(): %s", strerror(errno));
1594                                 exit(1);
1595                         }
1596                         else if (pid==0) {
1597                                 setsid();
1598                                 freopen("/dev/null", "r", stdin);
1599                                 freopen("/dev/null", "w", stdout);
1600                                 freopen("/dev/null", "w", stderr);
1601                                 break;
1602                         }
1603                         else {
1604                                 while (wait3(NULL,0,NULL) < 1);
1605                         }
1606                 }
1607         } else {
1608                 setvbuf(stdout, (char *)0, _IONBF, 0);
1609                 setvbuf(stderr, (char *)0, _IONBF, 0);
1610         }
1611
1612         pid = getpid();
1613         sprintf(errpbuf, "[%ld]", (long) pid);
1614         strcat(ident, errpbuf);
1615
1616         /* Initialization */
1617
1618         hash_init(); /* Actually for crc16 only */
1619         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1620         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1621
1622 #ifdef UPTIME_TRICK
1623         /* Hope 12 days is enough :-/ */
1624         start_time_offset = 1 << 20;
1625
1626         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1627 #endif
1628         gettime(&start_time);
1629
1630         /*
1631            Build static pending queue as circular buffer.
1632            */
1633         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1634         pending_tail = pending_head;
1635         for (i = pending_queue_length - 1; i--;) {
1636                 if (!(pending_tail->next = mem_alloc())) {
1637 err_mem_alloc:
1638                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1639                         exit(1);
1640                 }
1641                 pending_tail = pending_tail->next;
1642         }
1643         pending_tail->next = pending_head;
1644         pending_tail = pending_head;
1645
1646         sigemptyset(&sig_mask);
1647         sigact.sa_handler = &sighandler;
1648         sigact.sa_mask = sig_mask;
1649         sigact.sa_flags = 0;
1650         sigaddset(&sig_mask, SIGTERM);
1651         sigaction(SIGTERM, &sigact, 0);
1652 #if ((DEBUG) & DEBUG_I)
1653         sigaddset(&sig_mask, SIGUSR1);
1654         sigaction(SIGUSR1, &sigact, 0);
1655 #endif
1656         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1657                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1658                 exit(1);
1659         }
1660
1661         my_log(LOG_INFO, "Starting %s...", VERSION);
1662
1663         if (parms[cflag].count) {
1664                 if (chdir(parms[cflag].arg) || chroot(".")) {
1665                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1666                         exit(1);
1667                 }
1668         }
1669
1670         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1671         pthread_attr_init(&tattr);
1672         for (i = 0; i < THREADS - 1; i++) {
1673                 if (schedp.sched_priority > 0) {
1674                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1675                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1676                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1677                                 exit(1);
1678                         }
1679                 }
1680                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1681                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1682                         exit(1);
1683                 }
1684                 pthread_detach(thid);
1685                 schedp.sched_priority++;
1686         }
1687
1688         if (pw) {
1689                 if (setgroups(0, NULL)) {
1690                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1691                         exit(1);
1692                 }
1693                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1694                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1695                         exit(1);
1696                 }
1697                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1698                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1699                         exit(1);
1700                 }
1701         }
1702
1703         if (!(pidfile = fopen(pidfilepath, "w")))
1704                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1705         else {
1706                 fprintf(pidfile, "%ld\n", (long) pid);
1707                 fclose(pidfile);
1708         }
1709
1710         my_log(LOG_INFO, "pid: %d", pid);
1711         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1712                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1713                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1714                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1715                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1716                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1717                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1718         for (i = 0; i < nsnmp_rules; i++) {
1719                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1720                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1721         }
1722         for (i = 0; i < npeers; i++) {
1723                 switch (peers[i].type) {
1724                         case PEER_MIRROR:
1725                                 c = 'm';
1726                                 break;
1727                         case PEER_ROTATE:
1728                                 c = 'r';
1729                                 break;
1730                 }
1731                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1732                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1733                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1734         }
1735
1736         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1737
1738         timeout.tv_usec = 0;
1739         while (!killed
1740                         || (total_elements - free_elements - pending_queue_length)
1741                         || emit_count
1742                         || pending_tail->flags) {
1743
1744                 if (!sigs) {
1745                         timeout.tv_sec = scan_interval;
1746                         select(0, 0, 0, 0, &timeout);
1747                 }
1748
1749                 if (sigs & SIGTERM_MASK && !killed) {
1750                         sigs &= ~SIGTERM_MASK;
1751                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1752                         scan_interval = 1;
1753                         frag_lifetime = -1;
1754                         active_lifetime = -1;
1755                         inactive_lifetime = -1;
1756                         emit_timeout = 1;
1757                         unpending_timeout = 1;
1758                         killed = 1;
1759                         pthread_cond_signal(&scan_cond);
1760                         pthread_cond_signal(&unpending_cond);
1761                 }
1762
1763 #if ((DEBUG) & DEBUG_I)
1764                 if (sigs & SIGUSR1_MASK) {
1765                         sigs &= ~SIGUSR1_MASK;
1766                         info_debug();
1767                 }
1768 #endif
1769         }
1770         remove(pidfilepath);
1771 #if ((DEBUG) & DEBUG_I)
1772         info_debug();
1773 #endif
1774         my_log(LOG_INFO, "Done.");
1775 #ifdef WALL
1776         return 0;
1777 #endif
1778 }