ae855bd563544bc2434c96036221c401236cfc25
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, slice_id support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 /* #include "vserver.h" */
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220 static char cur_output_file[MAX_PATH_LEN];
221
222 static struct Flow *flows[1 << HASH_BITS];
223 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
224
225 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
226 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
227
228 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *pending_head, *pending_tail;
231 static struct Flow *scan_frag_dreg;
232
233 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
235 static struct Flow *flows_emit;
236
237 static char ident[256] = "fprobe-ulog";
238 static FILE *pidfile;
239 static char *pidfilepath;
240 static pid_t pid;
241 static int killed;
242 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
243 static struct ipulog_handle *ulog_handle;
244 static uint32_t ulog_gmask = 1;
245 static char *cap_buf;
246 static int nsnmp_rules;
247 static struct snmp_rule *snmp_rules;
248 static struct passwd *pw = 0;
249
250 void usage()
251 {
252         fprintf(stdout,
253                         "fprobe-ulog: a NetFlow probe. Version %s\n"
254                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
255                         "\n"
256                         "-h\t\tDisplay this help\n"
257                         "-U <mask>\tULOG group bitwise mask [1]\n"
258                         "-s <seconds>\tHow often scan for expired flows [5]\n"
259                         "-g <seconds>\tFragmented flow lifetime [30]\n"
260                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
261                         "-f <filename>\tLog flow data in a file\n"
262                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
263                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
264                         "-a <address>\tUse <address> as source for NetFlow flow\n"
265                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
266                         "-M\t\tUse netfilter mark value as ToS flag\n"
267                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
268                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
269                         "-q <flows>\tPending queue length [100]\n"
270                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
271                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
272                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
273                         "-c <directory>\tDirectory to chroot to\n"
274                         "-u <user>\tUser to run as\n"
275                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
276                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
277                         "-y <remote:port>\tAddress of the NetFlow collector\n"
278                         "-f <writable file>\tFile to write data into\n"
279                         "-T <n>\tRotate log file every n epochs\n"
280                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
281                         "-E <[1..60]>\tSize of an epoch in minutes\n"
282                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
283                         ,
284                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
285         exit(0);
286 }
287
288 #if ((DEBUG) & DEBUG_I)
289 void info_debug()
290 {
291         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
292                         pkts_total, pkts_total_fragmented, size_total,
293                         pkts_pending - pkts_pending_done, pending_queue_trace);
294         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
295                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
296         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
297                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
298         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
299                         total_elements, free_elements, total_memory);
300 }
301 #endif
302
303 void sighandler(int sig)
304 {
305         switch (sig) {
306                 case SIGTERM:
307                         sigs |= SIGTERM_MASK;
308                         break;
309 #if ((DEBUG) & DEBUG_I)
310                 case SIGUSR1:
311                         sigs |= SIGUSR1_MASK;
312                         break;
313 #endif
314         }
315 }
316
317 void gettime(struct Time *now)
318 {
319         struct timeval t;
320
321         gettimeofday(&t, 0);
322         now->sec = t.tv_sec;
323         now->usec = t.tv_usec;
324 }
325
326
327 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
328 {
329         return (t1->sec - t2->sec)/60;
330 }
331
332 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
333 {
334         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
335 }
336
337 /* Uptime in miliseconds */
338 uint32_t getuptime(struct Time *t)
339 {
340         /* Maximum uptime is about 49/2 days */
341         return cmpmtime(t, &start_time);
342 }
343
344 /* Uptime in minutes */
345 uint32_t getuptime_minutes(struct Time *t)
346 {
347         /* Maximum uptime is about 49/2 days */
348         return cmpMtime(t, &start_time);
349 }
350
351 hash_t hash_flow(struct Flow *flow)
352 {
353         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
354         else return hash(flow, sizeof(struct Flow_TL));
355 }
356
357 uint16_t snmp_index(char *name) {
358         uint32_t i;
359
360         if (!*name) return 0;
361
362         for (i = 0; (int) i < nsnmp_rules; i++) {
363                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
364                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
365         }
366
367         if ((i = if_nametoindex(name))) return i;
368
369         return -1;
370 }
371
372 inline void copy_flow(struct Flow *src, struct Flow *dst)
373 {
374         dst->iif = src->iif;
375         dst->oif = src->oif;
376         dst->sip = src->sip;
377         dst->dip = src->dip;
378         dst->tos = src->tos;
379         dst->slice_id = src->slice_id;
380         dst->proto = src->proto;
381         dst->tcp_flags = src->tcp_flags;
382         dst->id = src->id;
383         dst->sp = src->sp;
384         dst->dp = src->dp;
385         dst->pkts = src->pkts;
386         dst->size = src->size;
387         dst->sizeF = src->sizeF;
388         dst->sizeP = src->sizeP;
389         dst->ctime = src->ctime;
390         dst->mtime = src->mtime;
391         dst->flags = src->flags;
392 }
393
394 void read_cur_epoch() {
395         int fd;
396         /* Reset to -1 in case the read fails */
397         cur_epoch=-1;
398         fd = open(LAST_EPOCH_FILE, O_RDONLY);
399         if (fd != -1) {
400                 char snum[MAX_EPOCH_SIZE];
401                 ssize_t len;
402                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
403                 if (len != -1) {
404                         snum[len]='\0';
405                         sscanf(snum,"%d",&cur_epoch);
406                         cur_epoch++; /* Let's not stone the last epoch */
407                         close(fd);
408                 }
409         }
410         return;
411 }
412
413
414 /* Dumps the current epoch in a file to cope with
415  * reboots and killings of fprobe */
416
417 void update_cur_epoch_file(int n) {
418         int fd, len;
419         char snum[MAX_EPOCH_SIZE];
420         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
421         fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
422         if (fd == -1) {
423                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
424                 return;
425         }
426         write(fd, snum, len);
427         close(fd);
428 }
429
430 /* Get the file descriptor corresponding to the current file.
431  * The kludgy implementation is to abstract away the 'current
432  * file descriptor', which may also be a socket. 
433  */
434
435 unsigned get_data_file_fd(char *fname, int cur_fd) {
436         struct Time now;
437         unsigned cur_uptime;
438
439         struct statfs statfs;
440         int ret_fd;
441
442         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
443          * doesn't solve the problem */
444         gettime(&now);
445         cur_uptime = getuptime_minutes(&now);
446
447         if (cur_fd != START_DATA_FD) {
448                 if (fstatfs(cur_fd, &statfs) == -1) {
449                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
450                 }
451                 else {
452                         if (min_free && (statfs.f_bavail < min_free)
453                                         && (cur_epoch==last_peak))
454                         {
455                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
456                                 cur_epoch = -1;
457                         }
458                         /*
459                            else 
460                            assume that we can reclaim space by overwriting our own files
461                            and that the difference in size will not fill the disk - sapan
462                            */
463                 }
464         }
465
466         /* If epoch length has been exceeded, 
467          * or we're starting up 
468          * or we're going back to the first epoch */
469         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0) {
475                         close(cur_fd);
476             /* Compress the finished file */
477             char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; 
478             snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
479             system(gzip_cmd);
480         }
481                 snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
482                 if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)) < 0) {
483                         my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
484                         exit(1);
485                 }
486                 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
487                         my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
488                 }
489                 update_cur_epoch_file(cur_epoch);
490                 ret_fd = write_fd;
491         }
492         else {
493                 ret_fd = cur_fd;
494         }
495         return(ret_fd);
496 }
497
498 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
499 {
500         struct Flow **flowpp;
501
502 #ifdef WALL
503         flowpp = 0;
504 #endif
505
506         if (prev) flowpp = *prev;
507
508         while (where) {
509                 if (where->sip.s_addr == what->sip.s_addr
510                                 && where->dip.s_addr == what->dip.s_addr
511                                 && where->proto == what->proto) {
512                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
513                                 case 0:
514                                         /* Both unfragmented */
515                                         if ((what->sp == where->sp)
516                                                         && (what->dp == where->dp)) goto done;
517                                         break;
518                                 case 2:
519                                         /* Both fragmented */
520                                         if (where->id == what->id) goto done;
521                                         break;
522                         }
523                 }
524                 flowpp = &where->next;
525                 where = where->next;
526         }
527 done:
528         if (prev) *prev = flowpp;
529         return where;
530 }
531
532         int put_into(struct Flow *flow, int flag
533 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
534                         , char *logbuf
535 #endif
536                     )
537 {
538         int ret = 0;
539         hash_t h;
540         struct Flow *flown, **flowpp;
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542         char buf[64];
543 #endif
544
545         h = hash_flow(flow);
546 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
547         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
548         strcat(logbuf, buf);
549 #endif
550         pthread_mutex_lock(&flows_mutex[h]);
551         flowpp = &flows[h];
552         if (!(flown = find(flows[h], flow, &flowpp))) {
553                 /* No suitable flow found - add */
554                 if (flag == COPY_INTO) {
555                         if ((flown = mem_alloc())) {
556                                 copy_flow(flow, flown);
557                                 flow = flown;
558                         } else {
559 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
560                                 my_log(LOG_ERR, "%s %s. %s",
561                                                 "mem_alloc():", strerror(errno), "packet lost");
562 #endif
563                                 return -1;
564                         }
565                 }
566                 flow->next = flows[h];
567                 flows[h] = flow;
568 #if ((DEBUG) & DEBUG_I)
569                 flows_total++;
570                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
571 #endif
572 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
573                 if (flown) {
574                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
575                         strcat(logbuf, buf);
576                 }
577 #endif
578         } else {
579                 /* Found suitable flow - update */
580 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
581                 sprintf(buf, " +> %x", (unsigned) flown);
582                 strcat(logbuf, buf);
583 #endif
584                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
585                         flown->mtime = flow->mtime;
586                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
587                         flown->ctime = flow->ctime;
588                 flown->tcp_flags |= flow->tcp_flags;
589                 flown->size += flow->size;
590                 flown->pkts += flow->pkts;
591
592                 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
593                  * if a better value comes along. A good example of this is that by the time CoDemux sets the
594                  * peercred of a flow, it has already been accounted for here and attributed to root. */
595
596                 if (flown->slice_id<1)
597                                 flown->slice_id = flow->slice_id;
598
599
600                 if (flow->flags & FLOW_FRAG) {
601                         /* Fragmented flow require some additional work */
602                         if (flow->flags & FLOW_TL) {
603                                 /*
604                                    ?FIXME?
605                                    Several packets with FLOW_TL (attack)
606                                    */
607                                 flown->sp = flow->sp;
608                                 flown->dp = flow->dp;
609                         }
610                         if (flow->flags & FLOW_LASTFRAG) {
611                                 /*
612                                    ?FIXME?
613                                    Several packets with FLOW_LASTFRAG (attack)
614                                    */
615                                 flown->sizeP = flow->sizeP;
616                         }
617                         flown->flags |= flow->flags;
618                         flown->sizeF += flow->sizeF;
619                         if ((flown->flags & FLOW_LASTFRAG)
620                                         && (flown->sizeF >= flown->sizeP)) {
621                                 /* All fragments received - flow reassembled */
622                                 *flowpp = flown->next;
623                                 pthread_mutex_unlock(&flows_mutex[h]);
624 #if ((DEBUG) & DEBUG_I)
625                                 flows_total--;
626                                 flows_fragmented--;
627 #endif
628                                 flown->id = 0;
629                                 flown->flags &= ~FLOW_FRAG;
630 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
631                                 strcat(logbuf," R");
632 #endif
633                                 ret = put_into(flown, MOVE_INTO
634 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
635                                                 , logbuf
636 #endif
637                                               );
638                         }
639                 }
640                 if (flag == MOVE_INTO) mem_free(flow);
641         }
642         pthread_mutex_unlock(&flows_mutex[h]);
643         return ret;
644 }
645
646 int onlyonce=0;
647
648 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
649 {
650         int i;
651
652         for (i = 0; i < fields; i++) {
653 #if ((DEBUG) & DEBUG_F)
654                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
655 #endif
656                 switch (format[i]) {
657                         case NETFLOW_IPV4_SRC_ADDR:
658                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
659                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
660                                 break;
661
662                         case NETFLOW_IPV4_DST_ADDR:
663                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
664                                 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
665                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
666                                 }
667                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
668                                 break;
669
670                         case NETFLOW_INPUT_SNMP:
671                                 *((uint16_t *) p) = htons(flow->iif);
672                                 p += NETFLOW_INPUT_SNMP_SIZE;
673                                 break;
674
675                         case NETFLOW_OUTPUT_SNMP:
676                                 *((uint16_t *) p) = htons(flow->oif);
677                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
678                                 break;
679
680                         case NETFLOW_PKTS_32:
681                                 *((uint32_t *) p) = htonl(flow->pkts);
682                                 p += NETFLOW_PKTS_32_SIZE;
683                                 break;
684
685                         case NETFLOW_BYTES_32:
686                                 *((uint32_t *) p) = htonl(flow->size);
687                                 p += NETFLOW_BYTES_32_SIZE;
688                                 break;
689
690                         case NETFLOW_FIRST_SWITCHED:
691                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
692                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
693                                 break;
694
695                         case NETFLOW_LAST_SWITCHED:
696                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
697                                 p += NETFLOW_LAST_SWITCHED_SIZE;
698                                 break;
699
700                         case NETFLOW_L4_SRC_PORT:
701                                 *((uint16_t *) p) = flow->sp;
702                                 p += NETFLOW_L4_SRC_PORT_SIZE;
703                                 break;
704
705                         case NETFLOW_L4_DST_PORT:
706                                 *((uint16_t *) p) = flow->dp;
707                                 p += NETFLOW_L4_DST_PORT_SIZE;
708                                 break;
709
710                         case NETFLOW_PROT:
711                                 *((uint8_t *) p) = flow->proto;
712                                 p += NETFLOW_PROT_SIZE;
713                                 break;
714
715                         case NETFLOW_SRC_TOS:
716                                 *((uint8_t *) p) = flow->tos;
717                                 p += NETFLOW_SRC_TOS_SIZE;
718                                 break;
719
720                         case NETFLOW_TCP_FLAGS:
721                                 *((uint8_t *) p) = flow->tcp_flags;
722                                 p += NETFLOW_TCP_FLAGS_SIZE;
723                                 break;
724
725                         case NETFLOW_VERSION:
726                                 *((uint16_t *) p) = htons(netflow->Version);
727                                 p += NETFLOW_VERSION_SIZE;
728                                 break;
729
730                         case NETFLOW_COUNT:
731                                 *((uint16_t *) p) = htons(emit_count);
732                                 p += NETFLOW_COUNT_SIZE;
733                                 break;
734
735                         case NETFLOW_UPTIME:
736                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
737                                 p += NETFLOW_UPTIME_SIZE;
738                                 break;
739
740                         case NETFLOW_UNIX_SECS:
741                                 *((uint32_t *) p) = htonl(emit_time.sec);
742                                 p += NETFLOW_UNIX_SECS_SIZE;
743                                 break;
744
745                         case NETFLOW_UNIX_NSECS:
746                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
747                                 p += NETFLOW_UNIX_NSECS_SIZE;
748                                 break;
749
750                         case NETFLOW_FLOW_SEQUENCE:
751                                 //*((uint32_t *) p) = htonl(emit_sequence);
752                                 *((uint32_t *) p) = 0;
753                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
754                                 break;
755
756                         case NETFLOW_PAD8:
757                                 /* Unsupported (uint8_t) */
758                         case NETFLOW_ENGINE_TYPE:
759                         case NETFLOW_ENGINE_ID:
760                         case NETFLOW_FLAGS7_1:
761                         case NETFLOW_SRC_MASK:
762                         case NETFLOW_DST_MASK:
763                                 if (onlyonce) {
764                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
765                                         onlyonce=1;
766                                 }
767                                 *((uint8_t *) p) = 0;
768                                 p += NETFLOW_PAD8_SIZE;
769                                 break;
770                         case NETFLOW_SLICE_ID:
771                                 *((uint32_t *) p) = flow->slice_id;
772                                 p += NETFLOW_SLICE_ID_SIZE;
773                                 break;
774                         case NETFLOW_PAD16:
775                                 /* Unsupported (uint16_t) */
776                         case NETFLOW_SRC_AS:
777                         case NETFLOW_DST_AS:
778                         case NETFLOW_FLAGS7_2:
779                                 *((uint16_t *) p) = 0;
780                                 p += NETFLOW_PAD16_SIZE;
781                                 break;
782
783                         case NETFLOW_PAD32:
784                                 /* Unsupported (uint32_t) */
785                         case NETFLOW_IPV4_NEXT_HOP:
786                         case NETFLOW_ROUTER_SC:
787                                 *((uint32_t *) p) = 0;
788                                 p += NETFLOW_PAD32_SIZE;
789                                 break;
790
791                         default:
792                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
793                                                 format, i, format[i]);
794                                 exit(1);
795                 }
796         }
797 #if ((DEBUG) & DEBUG_F)
798         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
799 #endif
800         return p;
801 }
802
803 void setuser() {
804         /*
805            Workaround for clone()-based threads
806            Try to change EUID independently of main thread
807            */
808         if (pw) {
809                 setgroups(0, NULL);
810                 setregid(pw->pw_gid, pw->pw_gid);
811                 setreuid(pw->pw_uid, pw->pw_uid);
812         }
813 }
814
815 void *emit_thread()
816 {
817         struct Flow *flow;
818         void *p;
819         struct timeval now;
820         struct timespec timeout;
821         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
822
823         p = (void *) &emit_packet + netflow->HeaderSize;
824         timeout.tv_nsec = 0;
825
826         setuser();
827         
828         for (;;) {
829                 pthread_mutex_lock(&emit_mutex);
830                 while (!flows_emit) {
831                         gettimeofday(&now, 0);
832                         timeout.tv_sec = now.tv_sec + emit_timeout;
833                         /* Do not wait until emit_packet will filled - it may be too long */
834                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
835                 //my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
836                                 pthread_mutex_unlock(&emit_mutex);
837                                 goto sendit;
838                         }
839                 }
840                 flow = flows_emit;
841                 flows_emit = flows_emit->next;
842 #if ((DEBUG) & DEBUG_I)
843                 emit_queue--;
844 #endif          
845                 pthread_mutex_unlock(&emit_mutex);
846
847 #ifdef UPTIME_TRICK
848                 if (!emit_count) {
849                         gettime(&start_time);
850                         start_time.sec -= start_time_offset;
851                 }
852 #endif
853                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
854                 mem_free(flow);
855                 emit_count++;
856 #ifdef PF2_DEBUG
857                 printf("Emit count = %d\n", emit_count);
858                 fflush(stdout);
859 #endif
860                 if (emit_count == netflow->MaxFlows) {
861 sendit:
862                         gettime(&emit_time);
863                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
864                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
865                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
866 #ifdef STD_NETFLOW_PDU
867                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
868 #endif
869                         peer_rot_cur = 0;
870                         for (i = 0; i < npeers; i++) {
871                                 if (peers[i].type == PEER_FILE) {
872                                         if (netflow->SeqOffset)
873                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
874                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
875                                         ret = write(peers[i].write_fd, emit_packet, size);
876                                         if (ret < size) {
877
878 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
879                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
880                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
881 #endif
882 #undef MESSAGES
883                                         }
884 #if ((DEBUG) & DEBUG_E)
885                                         else {
886                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
887                                                                 emit_count, i + 1, peers[i].seq);
888                                         }
889 #endif
890                                         peers[i].seq += emit_count;
891
892                                         /* Rate limit */
893                                         if (emit_rate_bytes) {
894                                                 sent += size;
895                                                 delay = sent / emit_rate_bytes;
896                                                 if (delay) {
897                                                         sent %= emit_rate_bytes;
898                                                         timeout.tv_sec = 0;
899                                                         timeout.tv_nsec = emit_rate_delay * delay;
900                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
901                                                 }
902                                         }
903                                 }
904                                 else
905                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
906                                         else
907                                                 if (peers[i].type == PEER_ROTATE) 
908                                                         if (peer_rot_cur++ == peer_rot_work) {
909 sendreal:
910                                                                 if (netflow->SeqOffset)
911                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
912                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
913                                                                 if (ret < size) {
914 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
915                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
916                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
917 #endif
918                                                                 }
919 #if ((DEBUG) & DEBUG_E)
920                                                                 else {
921                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
922                                                                                         emit_count, i + 1, peers[i].seq);
923                                                                 }
924 #endif
925                                                                 peers[i].seq += emit_count;
926
927                                                                 /* Rate limit */
928                                                                 if (emit_rate_bytes) {
929                                                                         sent += size;
930                                                                         delay = sent / emit_rate_bytes;
931                                                                         if (delay) {
932                                                                                 sent %= emit_rate_bytes;
933                                                                                 timeout.tv_sec = 0;
934                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
935                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
936                                                                         }
937                                                                 }
938                                                         }
939                         }
940                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
941                         emit_sequence += emit_count;
942                         emit_count = 0;
943 #if ((DEBUG) & DEBUG_I)
944                         emit_pkts++;
945 #endif
946                 }
947         }
948 }       
949
950 void *unpending_thread()
951 {
952         struct timeval now;
953         struct timespec timeout;
954 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
955         char logbuf[256];
956 #endif
957
958         setuser();
959
960         timeout.tv_nsec = 0;
961         pthread_mutex_lock(&unpending_mutex);
962
963         for (;;) {
964                 while (!(pending_tail->flags & FLOW_PENDING)) {
965                         gettimeofday(&now, 0);
966                         timeout.tv_sec = now.tv_sec + unpending_timeout;
967                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
968                 }
969
970 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
971                 *logbuf = 0;
972 #endif
973                 if (put_into(pending_tail, COPY_INTO
974 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
975                                         , logbuf
976 #endif
977                             ) < 0) {
978 #if ((DEBUG) & DEBUG_I)
979                         pkts_lost_unpending++;
980 #endif                          
981                 }
982
983 #if ((DEBUG) & DEBUG_U)
984                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
985 #endif
986
987                 pending_tail->flags = 0;
988                 pending_tail = pending_tail->next;
989 #if ((DEBUG) & DEBUG_I)
990                 pkts_pending_done++;
991 #endif
992         }
993 }
994
995 void *scan_thread()
996 {
997 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
998         char logbuf[256];
999 #endif
1000         int i;
1001         struct Flow *flow, **flowpp;
1002         struct Time now;
1003         struct timespec timeout;
1004
1005         setuser();
1006
1007         timeout.tv_nsec = 0;
1008         pthread_mutex_lock(&scan_mutex);
1009
1010         for (;;) {
1011                 gettime(&now);
1012                 timeout.tv_sec = now.sec + scan_interval;
1013                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1014
1015                 gettime(&now);
1016 #if ((DEBUG) & DEBUG_S)
1017                 my_log(LOG_DEBUG, "S: %d", now.sec);
1018 #endif
1019                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1020                         pthread_mutex_lock(&flows_mutex[i]);
1021                         flow = flows[i];
1022                         flowpp = &flows[i];
1023                         while (flow) {
1024                                 if (flow->flags & FLOW_FRAG) {
1025                                         /* Process fragmented flow */
1026                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1027                                                 /* Fragmented flow expired - put it into special chain */
1028 #if ((DEBUG) & DEBUG_I)
1029                                                 flows_fragmented--;
1030                                                 flows_total--;
1031 #endif
1032                                                 *flowpp = flow->next;
1033                                                 flow->id = 0;
1034                                                 flow->flags &= ~FLOW_FRAG;
1035                                                 flow->next = scan_frag_dreg;
1036                                                 scan_frag_dreg = flow;
1037                                                 flow = *flowpp;
1038                                                 continue;
1039                                         }
1040                                 } else {
1041                                         /* Flow is not frgamented */
1042                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1043                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1044                                                 /* Flow expired */
1045 #if ((DEBUG) & DEBUG_S)
1046                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1047 #endif
1048 #if ((DEBUG) & DEBUG_I)
1049                                                 flows_total--;
1050 #endif
1051                                                 *flowpp = flow->next;
1052                                                 pthread_mutex_lock(&emit_mutex);
1053                                                 flow->next = flows_emit;
1054                                                 flows_emit = flow;
1055 #if ((DEBUG) & DEBUG_I)
1056                                                 emit_queue++;
1057 #endif                          
1058                                                 pthread_mutex_unlock(&emit_mutex);
1059                                                 flow = *flowpp;
1060                                                 continue;
1061                                         }
1062                                 }
1063                                 flowpp = &flow->next;
1064                                 flow = flow->next;
1065                         } /* chain loop */
1066                         pthread_mutex_unlock(&flows_mutex[i]);
1067                 } /* hash loop */
1068                 if (flows_emit) pthread_cond_signal(&emit_cond);
1069
1070                 while (scan_frag_dreg) {
1071                         flow = scan_frag_dreg;
1072                         scan_frag_dreg = flow->next;
1073 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1074                         *logbuf = 0;
1075 #endif
1076                         put_into(flow, MOVE_INTO
1077 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1078                                         , logbuf
1079 #endif
1080                                 );
1081 #if ((DEBUG) & DEBUG_S)
1082                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1083 #endif
1084                 }
1085         }
1086 }
1087
1088 void *cap_thread()
1089 {
1090         struct ulog_packet_msg *ulog_msg;
1091         struct ip *nl;
1092         void *tl;
1093         struct Flow *flow;
1094         int len, off_frag, psize;
1095 #if ((DEBUG) & DEBUG_C)
1096         char buf[64];
1097         char logbuf[256];
1098 #endif
1099         int challenge;
1100
1101         setuser();
1102
1103         while (!killed) {
1104                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1105                 if (len <= 0) {
1106                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1107                         continue;
1108                 }
1109                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1110
1111 #if ((DEBUG) & DEBUG_C)
1112                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1113 #endif
1114
1115                         nl = (void *) &ulog_msg->payload;
1116                         psize = ulog_msg->data_len;
1117
1118                         /* Sanity check */
1119                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1120 #if ((DEBUG) & DEBUG_C)
1121                                 strcat(logbuf, " U");
1122                                 my_log(LOG_DEBUG, "%s", logbuf);
1123 #endif
1124 #if ((DEBUG) & DEBUG_I)
1125                                 pkts_ignored++;
1126 #endif
1127                                 continue;
1128                         }
1129
1130                         if (pending_head->flags) {
1131 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1132                                 my_log(LOG_ERR,
1133 # if ((DEBUG) & DEBUG_C)
1134                                                 "%s %s %s", logbuf,
1135 # else
1136                                                 "%s %s",
1137 # endif
1138                                                 "pending queue full:", "packet lost");
1139 #endif
1140 #if ((DEBUG) & DEBUG_I)
1141                                 pkts_lost_capture++;
1142 #endif
1143                                 goto done;
1144                         }
1145
1146 #if ((DEBUG) & DEBUG_I)
1147                         pkts_total++;
1148 #endif
1149
1150                         flow = pending_head;
1151
1152                         /* ?FIXME? Add sanity check for ip_len? */
1153                         flow->size = ntohs(nl->ip_len);
1154 #if ((DEBUG) & DEBUG_I)
1155                         size_total += flow->size;
1156 #endif
1157
1158                         flow->sip = nl->ip_src;
1159                         flow->dip = nl->ip_dst;
1160                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1161                         
1162                         /* It's going to be expensive calling this syscall on every flow.
1163                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1164
1165                         flow->slice_id = ulog_msg->mark;
1166
1167                         /*if (flow->slice_id < 1)
1168                                 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid*/
1169
1170
1171                         if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1172                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1173                         }
1174                         flow->iif = snmp_index(ulog_msg->indev_name);
1175                         flow->oif = snmp_index(ulog_msg->outdev_name);
1176                         flow->proto = nl->ip_p;
1177                         flow->id = 0;
1178                         flow->tcp_flags = 0;
1179                         flow->pkts = 1;
1180                         flow->sizeF = 0;
1181                         flow->sizeP = 0;
1182                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1183                         if (ulog_msg->timestamp_sec) {
1184                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1185                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1186                         } else gettime(&flow->ctime);
1187                         flow->mtime = flow->ctime;
1188
1189                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1190
1191                         /*
1192                            Offset (from network layer) to transport layer header/IP data
1193                            IOW IP header size ;-)
1194
1195                            ?FIXME?
1196                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1197                            */
1198                         off_tl = nl->ip_hl << 2;
1199                         tl = (void *) nl + off_tl;
1200
1201                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1202                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1203                         psize -= off_tl;
1204                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1205                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1206
1207                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1208                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1209 #if ((DEBUG) & DEBUG_C)
1210                                 strcat(logbuf, " F");
1211 #endif
1212 #if ((DEBUG) & DEBUG_I)
1213                                 pkts_total_fragmented++;
1214 #endif
1215                                 flow->flags |= FLOW_FRAG;
1216                                 flow->id = nl->ip_id;
1217
1218                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1219                                         /* Packet whith IP_MF contains information about whole datagram size */
1220                                         flow->flags |= FLOW_LASTFRAG;
1221                                         /* size = frag_offset*8 + data_size */
1222                                         flow->sizeP = off_frag + flow->sizeF;
1223                                 }
1224                         }
1225
1226 #if ((DEBUG) & DEBUG_C)
1227                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1228                         strcat(logbuf, buf);
1229                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1230                         strcat(logbuf, buf);
1231 #endif
1232
1233                         /*
1234                            Fortunately most interesting transport layer information fit
1235                            into first 8 bytes of IP data field (minimal nonzero size).
1236                            Thus we don't need actual packet reassembling to build whole
1237                            transport layer data. We only check the fragment offset for
1238                            zero value to find packet with this information.
1239                            */
1240                         if (!off_frag && psize >= 8) {
1241                                 switch (flow->proto) {
1242                                         case IPPROTO_TCP:
1243                                         case IPPROTO_UDP:
1244                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1245                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1246                                                 goto tl_known;
1247
1248 #ifdef ICMP_TRICK
1249                                         case IPPROTO_ICMP:
1250                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1251                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1252                                                 goto tl_known;
1253 #endif
1254 #ifdef ICMP_TRICK_CISCO
1255                                         case IPPROTO_ICMP:
1256                                                 flow->dp = *((int32_t *) tl);
1257                                                 goto tl_known;
1258 #endif
1259
1260                                         default:
1261                                                 /* Unknown transport layer */
1262 #if ((DEBUG) & DEBUG_C)
1263                                                 strcat(logbuf, " U");
1264 #endif
1265                                                 flow->sp = 0;
1266                                                 flow->dp = 0;
1267                                                 break;
1268
1269 tl_known:
1270 #if ((DEBUG) & DEBUG_C)
1271                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1272                                                 strcat(logbuf, buf);
1273 #endif
1274                                                 flow->flags |= FLOW_TL;
1275                                 }
1276                         }
1277
1278                         /* Check for tcp flags presence (including CWR and ECE). */
1279                         if (flow->proto == IPPROTO_TCP
1280                                         && off_frag < 16
1281                                         && psize >= 16 - off_frag) {
1282                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1283 #if ((DEBUG) & DEBUG_C)
1284                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1285                                 strcat(logbuf, buf);
1286 #endif
1287                         }
1288
1289 #if ((DEBUG) & DEBUG_C)
1290                         sprintf(buf, " => %x", (unsigned) flow);
1291                         strcat(logbuf, buf);
1292                         my_log(LOG_DEBUG, "%s", logbuf);
1293 #endif
1294
1295 #if ((DEBUG) & DEBUG_I)
1296                         pkts_pending++;
1297                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1298                         if (pending_queue_trace < pending_queue_trace_candidate)
1299                                 pending_queue_trace = pending_queue_trace_candidate;
1300 #endif
1301
1302                         /* Flow complete - inform unpending_thread() about it */
1303                         pending_head->flags |= FLOW_PENDING;
1304                         pending_head = pending_head->next;
1305 done:
1306                         pthread_cond_signal(&unpending_cond);
1307                 }
1308         }
1309         return 0;
1310 }
1311
1312 /* Copied out of CoDemux */
1313
1314 static int init_daemon() {
1315         pid_t pid;
1316         FILE *pidfile;
1317
1318         pidfile = fopen(PIDFILE, "w");
1319         if (pidfile == NULL) {
1320                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1321         }
1322
1323         if ((pid = fork()) < 0) {
1324                 fclose(pidfile);
1325                 my_log(LOG_ERR, "Could not fork!\n");
1326                 return(-1);
1327         }
1328         else if (pid != 0) {
1329                 /* i'm the parent, writing down the child pid  */
1330                 fprintf(pidfile, "%u\n", pid);
1331                 fclose(pidfile);
1332                 exit(0);
1333         }
1334
1335         /* close the pid file */
1336         fclose(pidfile);
1337
1338         /* routines for any daemon process
1339            1. create a new session 
1340            2. change directory to the root
1341            3. change the file creation permission 
1342            */
1343         setsid();
1344         chdir("/var/local/fprobe");
1345         umask(0);
1346
1347         return(0);
1348 }
1349
1350 int main(int argc, char **argv)
1351 {
1352         char errpbuf[512];
1353         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1354         int c, i, write_fd, memory_limit = 0;
1355         struct addrinfo hints, *res;
1356         struct sockaddr_in saddr;
1357         pthread_attr_t tattr;
1358         struct sigaction sigact;
1359         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1360         struct timeval timeout;
1361
1362         sched_min = sched_get_priority_min(SCHED);
1363         sched_max = sched_get_priority_max(SCHED);
1364
1365         memset(&saddr, 0 , sizeof(saddr));
1366         memset(&hints, 0 , sizeof(hints));
1367         hints.ai_flags = AI_PASSIVE;
1368         hints.ai_family = AF_INET;
1369         hints.ai_socktype = SOCK_DGRAM;
1370
1371         /* Process command line options */
1372
1373         opterr = 0;
1374         while ((c = my_getopt(argc, argv, parms)) != -1) {
1375                 switch (c) {
1376                         case '?':
1377                                 usage();
1378
1379                         case 'h':
1380                                 usage();
1381                 }
1382         }
1383
1384         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1385         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1386         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1387         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1388         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1389         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1390         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1391         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1392         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1393         if (parms[nflag].count) {
1394                 switch (atoi(parms[nflag].arg)) {
1395                         case 1:
1396                                 netflow = &NetFlow1;
1397                                 break;
1398
1399                         case 5:
1400                                 break;
1401
1402                         case 7:
1403                                 netflow = &NetFlow7;
1404                                 break;
1405
1406                         default:
1407                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1408                                 exit(1);
1409                 }
1410         }
1411         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1412         if (parms[lflag].count) {
1413                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1414                         *log_suffix++ = 0;
1415                         if (*log_suffix) {
1416                                 sprintf(errpbuf, "[%s]", log_suffix);
1417                                 strcat(ident, errpbuf);
1418                         }
1419                 }
1420                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1421                 if (log_suffix) *--log_suffix = ':';
1422         }
1423         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1424 err_malloc:
1425                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1426                 exit(1);
1427         }
1428         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1429         if (parms[qflag].count) {
1430                 pending_queue_length = atoi(parms[qflag].arg);
1431                 if (pending_queue_length < 1) {
1432                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1433                         exit(1);
1434                 }
1435         }
1436         if (parms[rflag].count) {
1437                 schedp.sched_priority = atoi(parms[rflag].arg);
1438                 if (schedp.sched_priority
1439                                 && (schedp.sched_priority < sched_min
1440                                         || schedp.sched_priority > sched_max)) {
1441                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1442                         exit(1);
1443                 }
1444         }
1445         if (parms[Bflag].count) {
1446                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1447         }
1448         if (parms[bflag].count) {
1449                 bulk_quantity = atoi(parms[bflag].arg);
1450                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1451                         fprintf(stderr, "Illegal %s\n", "bulk size");
1452                         exit(1);
1453                 }
1454         }
1455         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1456         if (parms[Xflag].count) {
1457                 for(i = 0; parms[Xflag].arg[i]; i++)
1458                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1459                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1460                         goto err_malloc;
1461                 rule = strtok(parms[Xflag].arg, ":");
1462                 for (i = 0; rule; i++) {
1463                         snmp_rules[i].len = strlen(rule);
1464                         if (snmp_rules[i].len > IFNAMSIZ) {
1465                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1466                                 exit(1);
1467                         }
1468                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1469                         if (!*(rule - 1)) *(rule - 1) = ',';
1470                         rule = strtok(NULL, ",");
1471                         if (!rule) {
1472                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1473                                 exit(1);
1474                         }
1475                         snmp_rules[i].base = atoi(rule);
1476                         *(rule - 1) = ':';
1477                         rule = strtok(NULL, ":");
1478                 }
1479                 nsnmp_rules = i;
1480         }
1481         if (parms[tflag].count)
1482                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1483         if (parms[aflag].count) {
1484                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1485 bad_lhost:
1486                         fprintf(stderr, "Illegal %s\n", "source address");
1487                         exit(1);
1488                 } else {
1489                         saddr = *((struct sockaddr_in *) res->ai_addr);
1490                         freeaddrinfo(res);
1491                 }
1492         }
1493         if (parms[uflag].count) 
1494                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1495                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1496                         exit(1);
1497                 }
1498
1499
1500         /* Process collectors parameters. Brrrr... :-[ */
1501
1502         npeers = argc - optind;
1503         if (npeers > 1) {
1504                 /* Send to remote Netflow collector */
1505                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1506                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1507                         dhost = argv[i];
1508                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1509                         *dport++ = 0;
1510                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1511                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1512                                 exit(1);
1513                         }
1514                         peers[npeers].write_fd = write_fd;
1515                         peers[npeers].type = PEER_MIRROR;
1516                         peers[npeers].laddr = saddr;
1517                         peers[npeers].seq = 0;
1518                         if ((lhost = strchr(dport, '/'))) {
1519                                 *lhost++ = 0;
1520                                 if ((type = strchr(lhost, '/'))) {
1521                                         *type++ = 0;
1522                                         switch (*type) {
1523                                                 case 0:
1524                                                 case 'm':
1525                                                         break;
1526
1527                                                 case 'r':
1528                                                         peers[npeers].type = PEER_ROTATE;
1529                                                         npeers_rot++;
1530                                                         break;
1531
1532                                                 default:
1533                                                         goto bad_collector;
1534                                         }
1535                                 }
1536                                 if (*lhost) {
1537                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1538                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1539                                         freeaddrinfo(res);
1540                                 }
1541                         }
1542                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1543                                                 sizeof(struct sockaddr_in))) {
1544                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1545                                 exit(1);
1546                         }
1547                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1548 bad_collector:
1549                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1550                                 exit(1);
1551                         }
1552                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1553                         freeaddrinfo(res);
1554                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1555                                                 sizeof(struct sockaddr_in))) {
1556                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1557                                 exit(1);
1558                         }
1559
1560                         /* Restore command line */
1561                         if (type) *--type = '/';
1562                         if (lhost) *--lhost = '/';
1563                         *--dport = ':';
1564                 }
1565         }
1566         else if (parms[fflag].count) {
1567                 // log into a file
1568                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1569                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1570                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1571
1572                 peers[npeers].write_fd = START_DATA_FD;
1573                 peers[npeers].type = PEER_FILE;
1574                 peers[npeers].seq = 0;
1575
1576                 read_cur_epoch();
1577                 npeers++;
1578         }
1579         else 
1580                 usage();
1581
1582
1583         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1584         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1585         if (!ulog_handle) {
1586                 fprintf(stderr, "libipulog initialization error: %s",
1587                                 ipulog_strerror(ipulog_errno));
1588                 exit(1);
1589         }
1590         if (sockbufsize)
1591                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1592                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1593                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1594
1595         /* Daemonize (if log destination stdout-free) */
1596
1597         my_log_open(ident, verbosity, log_dest);
1598
1599         init_daemon();
1600
1601         if (!(log_dest & 2)) {
1602                 /* Crash-proofing - Sapan*/
1603                 while (1) {
1604                         int pid=fork();
1605                         if (pid==-1) {
1606                                 fprintf(stderr, "fork(): %s", strerror(errno));
1607                                 exit(1);
1608                         }
1609                         else if (pid==0) {
1610                                 setsid();
1611                                 freopen("/dev/null", "r", stdin);
1612                                 freopen("/dev/null", "w", stdout);
1613                                 freopen("/dev/null", "w", stderr);
1614                                 break;
1615                         }
1616                         else {
1617                                 while (wait3(NULL,0,NULL) < 1);
1618                         }
1619                 }
1620         } else {
1621                 setvbuf(stdout, (char *)0, _IONBF, 0);
1622                 setvbuf(stderr, (char *)0, _IONBF, 0);
1623         }
1624
1625         pid = getpid();
1626         sprintf(errpbuf, "[%ld]", (long) pid);
1627         strcat(ident, errpbuf);
1628
1629         /* Initialization */
1630
1631     // init_slice_id_hash();
1632         hash_init(); /* Actually for crc16 only */
1633         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1634         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1635
1636 #ifdef UPTIME_TRICK
1637         /* Hope 12 days is enough :-/ */
1638         start_time_offset = 1 << 20;
1639
1640         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1641 #endif
1642         gettime(&start_time);
1643
1644         /*
1645            Build static pending queue as circular buffer.
1646            */
1647         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1648         pending_tail = pending_head;
1649         for (i = pending_queue_length - 1; i--;) {
1650                 if (!(pending_tail->next = mem_alloc())) {
1651 err_mem_alloc:
1652                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1653                         exit(1);
1654                 }
1655                 pending_tail = pending_tail->next;
1656         }
1657         pending_tail->next = pending_head;
1658         pending_tail = pending_head;
1659
1660         sigemptyset(&sig_mask);
1661         sigact.sa_handler = &sighandler;
1662         sigact.sa_mask = sig_mask;
1663         sigact.sa_flags = 0;
1664         sigaddset(&sig_mask, SIGTERM);
1665         sigaction(SIGTERM, &sigact, 0);
1666 #if ((DEBUG) & DEBUG_I)
1667         sigaddset(&sig_mask, SIGUSR1);
1668         sigaction(SIGUSR1, &sigact, 0);
1669 #endif
1670         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1671                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1672                 exit(1);
1673         }
1674
1675         my_log(LOG_INFO, "Starting %s...", VERSION);
1676
1677         if (parms[cflag].count) {
1678                 if (chdir(parms[cflag].arg) || chroot(".")) {
1679                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1680                         exit(1);
1681                 }
1682         }
1683
1684         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1685         pthread_attr_init(&tattr);
1686         for (i = 0; i < THREADS - 1; i++) {
1687                 if (schedp.sched_priority > 0) {
1688                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1689                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1690                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1691                                 exit(1);
1692                         }
1693                 }
1694                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1695                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1696                         exit(1);
1697                 }
1698                 pthread_detach(thid);
1699                 schedp.sched_priority++;
1700         }
1701
1702         if (pw) {
1703                 if (setgroups(0, NULL)) {
1704                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1705                         exit(1);
1706                 }
1707                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1708                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1709                         exit(1);
1710                 }
1711                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1712                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1713                         exit(1);
1714                 }
1715         }
1716
1717         if (!(pidfile = fopen(pidfilepath, "w")))
1718                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1719         else {
1720                 fprintf(pidfile, "%ld\n", (long) pid);
1721                 fclose(pidfile);
1722         }
1723
1724         my_log(LOG_INFO, "pid: %d", pid);
1725         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1726                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1727                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1728                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1729                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1730                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1731                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1732         for (i = 0; i < nsnmp_rules; i++) {
1733                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1734                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1735         }
1736         for (i = 0; i < npeers; i++) {
1737                 switch (peers[i].type) {
1738                         case PEER_MIRROR:
1739                                 c = 'm';
1740                                 break;
1741                         case PEER_ROTATE:
1742                                 c = 'r';
1743                                 break;
1744                 }
1745                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1746                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1747                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1748         }
1749
1750         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1751
1752         timeout.tv_usec = 0;
1753         while (!killed
1754                         || (total_elements - free_elements - pending_queue_length)
1755                         || emit_count
1756                         || pending_tail->flags) {
1757
1758                 if (!sigs) {
1759                         timeout.tv_sec = scan_interval;
1760                         select(0, 0, 0, 0, &timeout);
1761                 }
1762
1763                 if (sigs & SIGTERM_MASK && !killed) {
1764                         sigs &= ~SIGTERM_MASK;
1765                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1766                         scan_interval = 1;
1767                         frag_lifetime = -1;
1768                         active_lifetime = -1;
1769                         inactive_lifetime = -1;
1770                         emit_timeout = 1;
1771                         unpending_timeout = 1;
1772                         killed = 1;
1773                         pthread_cond_signal(&scan_cond);
1774                         pthread_cond_signal(&unpending_cond);
1775                 }
1776
1777 #if ((DEBUG) & DEBUG_I)
1778                 if (sigs & SIGUSR1_MASK) {
1779                         sigs &= ~SIGUSR1_MASK;
1780                         info_debug();
1781                 }
1782 #endif
1783         }
1784         remove(pidfilepath);
1785 #if ((DEBUG) & DEBUG_I)
1786         info_debug();
1787 #endif
1788         my_log(LOG_INFO, "Done.");
1789 #ifdef WALL
1790         return 0;
1791 #endif
1792 }