c8b5c25cb6189844f8c03c30f5bd8d431d584431
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 #include "vserver.h"
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
223
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
231
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
235
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
239 static pid_t pid;
240 static int killed;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
248
249 void usage()
250 {
251         fprintf(stdout,
252                         "fprobe-ulog: a NetFlow probe. Version %s\n"
253                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
254                         "\n"
255                         "-h\t\tDisplay this help\n"
256                         "-U <mask>\tULOG group bitwise mask [1]\n"
257                         "-s <seconds>\tHow often scan for expired flows [5]\n"
258                         "-g <seconds>\tFragmented flow lifetime [30]\n"
259                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260                         "-f <filename>\tLog flow data in a file\n"
261                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263                         "-a <address>\tUse <address> as source for NetFlow flow\n"
264                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265                         "-M\t\tUse netfilter mark value as ToS flag\n"
266                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268                         "-q <flows>\tPending queue length [100]\n"
269                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
270                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272                         "-c <directory>\tDirectory to chroot to\n"
273                         "-u <user>\tUser to run as\n"
274                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276                         "-y <remote:port>\tAddress of the NetFlow collector\n"
277                         "-f <writable file>\tFile to write data into\n"
278                         "-T <n>\tRotate log file every n epochs\n"
279                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280                         "-E <[1..60]>\tSize of an epoch in minutes\n"
281                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
282                         ,
283                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
284         exit(0);
285 }
286
287 #if ((DEBUG) & DEBUG_I)
288 void info_debug()
289 {
290         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291                         pkts_total, pkts_total_fragmented, size_total,
292                         pkts_pending - pkts_pending_done, pending_queue_trace);
293         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298                         total_elements, free_elements, total_memory);
299 }
300 #endif
301
302 void sighandler(int sig)
303 {
304         switch (sig) {
305                 case SIGTERM:
306                         sigs |= SIGTERM_MASK;
307                         break;
308 #if ((DEBUG) & DEBUG_I)
309                 case SIGUSR1:
310                         sigs |= SIGUSR1_MASK;
311                         break;
312 #endif
313         }
314 }
315
316 void gettime(struct Time *now)
317 {
318         struct timeval t;
319
320         gettimeofday(&t, 0);
321         now->sec = t.tv_sec;
322         now->usec = t.tv_usec;
323 }
324
325
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
327 {
328         return (t1->sec - t2->sec)/60;
329 }
330
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
332 {
333         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
334 }
335
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
338 {
339         /* Maximum uptime is about 49/2 days */
340         return cmpmtime(t, &start_time);
341 }
342
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
345 {
346         /* Maximum uptime is about 49/2 days */
347         return cmpMtime(t, &start_time);
348 }
349
350 hash_t hash_flow(struct Flow *flow)
351 {
352         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353         else return hash(flow, sizeof(struct Flow_TL));
354 }
355
356 uint16_t snmp_index(char *name) {
357         uint32_t i;
358
359         if (!*name) return 0;
360
361         for (i = 0; (int) i < nsnmp_rules; i++) {
362                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
364         }
365
366         if ((i = if_nametoindex(name))) return i;
367
368         return -1;
369 }
370
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
372 {
373         dst->iif = src->iif;
374         dst->oif = src->oif;
375         dst->sip = src->sip;
376         dst->dip = src->dip;
377         dst->tos = src->tos;
378         dst->xid = src->xid;
379         dst->proto = src->proto;
380         dst->tcp_flags = src->tcp_flags;
381         dst->id = src->id;
382         dst->sp = src->sp;
383         dst->dp = src->dp;
384         dst->pkts = src->pkts;
385         dst->size = src->size;
386         dst->sizeF = src->sizeF;
387         dst->sizeP = src->sizeP;
388         dst->ctime = src->ctime;
389         dst->mtime = src->mtime;
390         dst->flags = src->flags;
391 }
392
393 void read_cur_epoch() {
394         int fd;
395         /* Reset to -1 in case the read fails */
396         cur_epoch=-1;
397         fd = open(LAST_EPOCH_FILE, O_RDONLY);
398         if (fd != -1) {
399                 char snum[MAX_EPOCH_SIZE];
400                 ssize_t len;
401                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
402                 if (len != -1) {
403                         snum[len]='\0';
404                         sscanf(snum,"%d",&cur_epoch);
405                         cur_epoch++; /* Let's not stone the last epoch */
406                         close(fd);
407                 }
408         }
409         return;
410 }
411
412
413 /* Dumps the current epoch in a file to cope with
414  * reboots and killings of fprobe */
415
416 void update_cur_epoch_file(int n) {
417         int fd, len;
418         char snum[MAX_EPOCH_SIZE];
419         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420         fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
421         if (fd == -1) {
422                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
423                 return;
424         }
425         write(fd, snum, len);
426         close(fd);
427 }
428
429 /* Get the file descriptor corresponding to the current file.
430  * The kludgy implementation is to abstract away the 'current
431  * file descriptor', which may also be a socket. 
432  */
433
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
435         struct Time now;
436         unsigned cur_uptime;
437
438         struct statfs statfs;
439         int ret_fd;
440
441         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442          * doesn't solve the problem */
443         gettime(&now);
444         cur_uptime = getuptime_minutes(&now);
445
446         if (cur_fd != START_DATA_FD) {
447                 if (fstatfs(cur_fd, &statfs) == -1) {
448                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
449                 }
450                 else {
451                         if (min_free && (statfs.f_bavail < min_free)
452                                         && (cur_epoch==last_peak))
453                         {
454                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
455                                 cur_epoch = -1;
456                         }
457                         /*
458                            else 
459                            assume that we can reclaim space by overwriting our own files
460                            and that the difference in size will not fill the disk - sapan
461                            */
462                 }
463         }
464
465         /* If epoch length has been exceeded, 
466          * or we're starting up 
467          * or we're going back to the first epoch */
468         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469                 char nextname[MAX_PATH_LEN];
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0)
475                         close(cur_fd);
476                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477                 if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
478                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
479                         exit(1);
480                 }
481                 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
482                         my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno));
483                 }
484                 update_cur_epoch_file(cur_epoch);
485                 ret_fd = write_fd;
486         }
487         else {
488                 ret_fd = cur_fd;
489         }
490         return(ret_fd);
491 }
492
493 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
494 {
495         struct Flow **flowpp;
496
497 #ifdef WALL
498         flowpp = 0;
499 #endif
500
501         if (prev) flowpp = *prev;
502
503         while (where) {
504                 if (where->sip.s_addr == what->sip.s_addr
505                                 && where->dip.s_addr == what->dip.s_addr
506                                 && where->proto == what->proto) {
507                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
508                                 case 0:
509                                         /* Both unfragmented */
510                                         if ((what->sp == where->sp)
511                                                         && (what->dp == where->dp)) goto done;
512                                         break;
513                                 case 2:
514                                         /* Both fragmented */
515                                         if (where->id == what->id) goto done;
516                                         break;
517                         }
518                 }
519                 flowpp = &where->next;
520                 where = where->next;
521         }
522 done:
523         if (prev) *prev = flowpp;
524         return where;
525 }
526
527         int put_into(struct Flow *flow, int flag
528 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
529                         , char *logbuf
530 #endif
531                     )
532 {
533         int ret = 0;
534         hash_t h;
535         struct Flow *flown, **flowpp;
536 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
537         char buf[64];
538 #endif
539
540         h = hash_flow(flow);
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
543         strcat(logbuf, buf);
544 #endif
545         pthread_mutex_lock(&flows_mutex[h]);
546         flowpp = &flows[h];
547         if (!(flown = find(flows[h], flow, &flowpp))) {
548                 /* No suitable flow found - add */
549                 if (flag == COPY_INTO) {
550                         if ((flown = mem_alloc())) {
551                                 copy_flow(flow, flown);
552                                 flow = flown;
553                         } else {
554 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
555                                 my_log(LOG_ERR, "%s %s. %s",
556                                                 "mem_alloc():", strerror(errno), "packet lost");
557 #endif
558                                 return -1;
559                         }
560                 }
561                 flow->next = flows[h];
562                 flows[h] = flow;
563 #if ((DEBUG) & DEBUG_I)
564                 flows_total++;
565                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
566 #endif
567 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
568                 if (flown) {
569                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
570                         strcat(logbuf, buf);
571                 }
572 #endif
573         } else {
574                 /* Found suitable flow - update */
575 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
576                 sprintf(buf, " +> %x", (unsigned) flown);
577                 strcat(logbuf, buf);
578 #endif
579                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
580                         flown->mtime = flow->mtime;
581                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
582                         flown->ctime = flow->ctime;
583                 flown->tcp_flags |= flow->tcp_flags;
584                 flown->size += flow->size;
585                 flown->pkts += flow->pkts;
586
587                 /* The xid of the first xid of a flow is misleading. Reset the xid of the flow
588                  * if a better value comes along. A good example of this is that by the time CoDemux sets the
589                  * peercred of a flow, it has already been accounted for here and attributed to root. */
590
591                 if (flown->xid<1)
592                                 flown->xid = flow->xid;
593
594
595                 if (flow->flags & FLOW_FRAG) {
596                         /* Fragmented flow require some additional work */
597                         if (flow->flags & FLOW_TL) {
598                                 /*
599                                    ?FIXME?
600                                    Several packets with FLOW_TL (attack)
601                                    */
602                                 flown->sp = flow->sp;
603                                 flown->dp = flow->dp;
604                         }
605                         if (flow->flags & FLOW_LASTFRAG) {
606                                 /*
607                                    ?FIXME?
608                                    Several packets with FLOW_LASTFRAG (attack)
609                                    */
610                                 flown->sizeP = flow->sizeP;
611                         }
612                         flown->flags |= flow->flags;
613                         flown->sizeF += flow->sizeF;
614                         if ((flown->flags & FLOW_LASTFRAG)
615                                         && (flown->sizeF >= flown->sizeP)) {
616                                 /* All fragments received - flow reassembled */
617                                 *flowpp = flown->next;
618                                 pthread_mutex_unlock(&flows_mutex[h]);
619 #if ((DEBUG) & DEBUG_I)
620                                 flows_total--;
621                                 flows_fragmented--;
622 #endif
623                                 flown->id = 0;
624                                 flown->flags &= ~FLOW_FRAG;
625 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
626                                 strcat(logbuf," R");
627 #endif
628                                 ret = put_into(flown, MOVE_INTO
629 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
630                                                 , logbuf
631 #endif
632                                               );
633                         }
634                 }
635                 if (flag == MOVE_INTO) mem_free(flow);
636         }
637         pthread_mutex_unlock(&flows_mutex[h]);
638         return ret;
639 }
640
641 int onlyonce=0;
642
643 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
644 {
645         int i;
646
647         for (i = 0; i < fields; i++) {
648 #if ((DEBUG) & DEBUG_F)
649                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
650 #endif
651                 switch (format[i]) {
652                         case NETFLOW_IPV4_SRC_ADDR:
653                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
654                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
655                                 break;
656
657                         case NETFLOW_IPV4_DST_ADDR:
658                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
659                                 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
660                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
661                                 }
662                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
663                                 break;
664
665                         case NETFLOW_INPUT_SNMP:
666                                 *((uint16_t *) p) = htons(flow->iif);
667                                 p += NETFLOW_INPUT_SNMP_SIZE;
668                                 break;
669
670                         case NETFLOW_OUTPUT_SNMP:
671                                 *((uint16_t *) p) = htons(flow->oif);
672                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
673                                 break;
674
675                         case NETFLOW_PKTS_32:
676                                 *((uint32_t *) p) = htonl(flow->pkts);
677                                 p += NETFLOW_PKTS_32_SIZE;
678                                 break;
679
680                         case NETFLOW_BYTES_32:
681                                 *((uint32_t *) p) = htonl(flow->size);
682                                 p += NETFLOW_BYTES_32_SIZE;
683                                 break;
684
685                         case NETFLOW_FIRST_SWITCHED:
686                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
687                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
688                                 break;
689
690                         case NETFLOW_LAST_SWITCHED:
691                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
692                                 p += NETFLOW_LAST_SWITCHED_SIZE;
693                                 break;
694
695                         case NETFLOW_L4_SRC_PORT:
696                                 *((uint16_t *) p) = flow->sp;
697                                 p += NETFLOW_L4_SRC_PORT_SIZE;
698                                 break;
699
700                         case NETFLOW_L4_DST_PORT:
701                                 *((uint16_t *) p) = flow->dp;
702                                 p += NETFLOW_L4_DST_PORT_SIZE;
703                                 break;
704
705                         case NETFLOW_PROT:
706                                 *((uint8_t *) p) = flow->proto;
707                                 p += NETFLOW_PROT_SIZE;
708                                 break;
709
710                         case NETFLOW_SRC_TOS:
711                                 *((uint8_t *) p) = flow->tos;
712                                 p += NETFLOW_SRC_TOS_SIZE;
713                                 break;
714
715                         case NETFLOW_TCP_FLAGS:
716                                 *((uint8_t *) p) = flow->tcp_flags;
717                                 p += NETFLOW_TCP_FLAGS_SIZE;
718                                 break;
719
720                         case NETFLOW_VERSION:
721                                 *((uint16_t *) p) = htons(netflow->Version);
722                                 p += NETFLOW_VERSION_SIZE;
723                                 break;
724
725                         case NETFLOW_COUNT:
726                                 *((uint16_t *) p) = htons(emit_count);
727                                 p += NETFLOW_COUNT_SIZE;
728                                 break;
729
730                         case NETFLOW_UPTIME:
731                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
732                                 p += NETFLOW_UPTIME_SIZE;
733                                 break;
734
735                         case NETFLOW_UNIX_SECS:
736                                 *((uint32_t *) p) = htonl(emit_time.sec);
737                                 p += NETFLOW_UNIX_SECS_SIZE;
738                                 break;
739
740                         case NETFLOW_UNIX_NSECS:
741                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
742                                 p += NETFLOW_UNIX_NSECS_SIZE;
743                                 break;
744
745                         case NETFLOW_FLOW_SEQUENCE:
746                                 //*((uint32_t *) p) = htonl(emit_sequence);
747                                 *((uint32_t *) p) = 0;
748                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
749                                 break;
750
751                         case NETFLOW_PAD8:
752                                 /* Unsupported (uint8_t) */
753                         case NETFLOW_ENGINE_TYPE:
754                         case NETFLOW_ENGINE_ID:
755                         case NETFLOW_FLAGS7_1:
756                         case NETFLOW_SRC_MASK:
757                         case NETFLOW_DST_MASK:
758                                 if (onlyonce) {
759                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
760                                         onlyonce=1;
761                                 }
762                                 *((uint8_t *) p) = 0;
763                                 p += NETFLOW_PAD8_SIZE;
764                                 break;
765                         case NETFLOW_XID:
766                                 *((uint32_t *) p) = flow->xid;
767                                 p += NETFLOW_XID_SIZE;
768                                 break;
769                         case NETFLOW_PAD16:
770                                 /* Unsupported (uint16_t) */
771                         case NETFLOW_SRC_AS:
772                         case NETFLOW_DST_AS:
773                         case NETFLOW_FLAGS7_2:
774                                 *((uint16_t *) p) = 0;
775                                 p += NETFLOW_PAD16_SIZE;
776                                 break;
777
778                         case NETFLOW_PAD32:
779                                 /* Unsupported (uint32_t) */
780                         case NETFLOW_IPV4_NEXT_HOP:
781                         case NETFLOW_ROUTER_SC:
782                                 *((uint32_t *) p) = 0;
783                                 p += NETFLOW_PAD32_SIZE;
784                                 break;
785
786                         default:
787                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
788                                                 format, i, format[i]);
789                                 exit(1);
790                 }
791         }
792 #if ((DEBUG) & DEBUG_F)
793         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
794 #endif
795         return p;
796 }
797
798 void setuser() {
799         /*
800            Workaround for clone()-based threads
801            Try to change EUID independently of main thread
802            */
803         if (pw) {
804                 setgroups(0, NULL);
805                 setregid(pw->pw_gid, pw->pw_gid);
806                 setreuid(pw->pw_uid, pw->pw_uid);
807         }
808 }
809
810 void *emit_thread()
811 {
812         struct Flow *flow;
813         void *p;
814         struct timeval now;
815         struct timespec timeout;
816         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
817
818         p = (void *) &emit_packet + netflow->HeaderSize;
819         timeout.tv_nsec = 0;
820
821         setuser();
822
823         for (;;) {
824                 pthread_mutex_lock(&emit_mutex);
825                 while (!flows_emit) {
826                         gettimeofday(&now, 0);
827                         timeout.tv_sec = now.tv_sec + emit_timeout;
828                         /* Do not wait until emit_packet will filled - it may be too long */
829                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
830                                 pthread_mutex_unlock(&emit_mutex);
831                                 goto sendit;
832                         }
833                 }
834                 flow = flows_emit;
835                 flows_emit = flows_emit->next;
836 #if ((DEBUG) & DEBUG_I)
837                 emit_queue--;
838 #endif          
839                 pthread_mutex_unlock(&emit_mutex);
840
841 #ifdef UPTIME_TRICK
842                 if (!emit_count) {
843                         gettime(&start_time);
844                         start_time.sec -= start_time_offset;
845                 }
846 #endif
847                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
848                 mem_free(flow);
849                 emit_count++;
850 #ifdef PF2_DEBUG
851                 printf("Emit count = %d\n", emit_count);
852                 fflush(stdout);
853 #endif
854                 if (emit_count == netflow->MaxFlows) {
855 sendit:
856                         gettime(&emit_time);
857                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
858                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
859                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
860 #ifdef STD_NETFLOW_PDU
861                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
862 #endif
863                         peer_rot_cur = 0;
864                         for (i = 0; i < npeers; i++) {
865                                 if (peers[i].type == PEER_FILE) {
866                                         if (netflow->SeqOffset)
867                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
868                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
869                                         ret = write(peers[i].write_fd, emit_packet, size);
870                                         if (ret < size) {
871
872 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
873                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
874                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
875 #endif
876 #undef MESSAGES
877                                         }
878 #if ((DEBUG) & DEBUG_E)
879                                         commaneelse {
880                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
881                                                                 emit_count, i + 1, peers[i].seq);
882                                         }
883 #endif
884                                         peers[i].seq += emit_count;
885
886                                         /* Rate limit */
887                                         if (emit_rate_bytes) {
888                                                 sent += size;
889                                                 delay = sent / emit_rate_bytes;
890                                                 if (delay) {
891                                                         sent %= emit_rate_bytes;
892                                                         timeout.tv_sec = 0;
893                                                         timeout.tv_nsec = emit_rate_delay * delay;
894                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
895                                                 }
896                                         }
897                                 }
898                                 else
899                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
900                                         else
901                                                 if (peers[i].type == PEER_ROTATE) 
902                                                         if (peer_rot_cur++ == peer_rot_work) {
903 sendreal:
904                                                                 if (netflow->SeqOffset)
905                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
906                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
907                                                                 if (ret < size) {
908 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
909                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
910                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
911 #endif
912                                                                 }
913 #if ((DEBUG) & DEBUG_E)
914                                                                 commaneelse {
915                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
916                                                                                         emit_count, i + 1, peers[i].seq);
917                                                                 }
918 #endif
919                                                                 peers[i].seq += emit_count;
920
921                                                                 /* Rate limit */
922                                                                 if (emit_rate_bytes) {
923                                                                         sent += size;
924                                                                         delay = sent / emit_rate_bytes;
925                                                                         if (delay) {
926                                                                                 sent %= emit_rate_bytes;
927                                                                                 timeout.tv_sec = 0;
928                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
929                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
930                                                                         }
931                                                                 }
932                                                         }
933                         }
934                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
935                         emit_sequence += emit_count;
936                         emit_count = 0;
937 #if ((DEBUG) & DEBUG_I)
938                         emit_pkts++;
939 #endif
940                 }
941         }
942 }       
943
944 void *unpending_thread()
945 {
946         struct timeval now;
947         struct timespec timeout;
948 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
949         char logbuf[256];
950 #endif
951
952         setuser();
953
954         timeout.tv_nsec = 0;
955         pthread_mutex_lock(&unpending_mutex);
956
957         for (;;) {
958                 while (!(pending_tail->flags & FLOW_PENDING)) {
959                         gettimeofday(&now, 0);
960                         timeout.tv_sec = now.tv_sec + unpending_timeout;
961                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
962                 }
963
964 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
965                 *logbuf = 0;
966 #endif
967                 if (put_into(pending_tail, COPY_INTO
968 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
969                                         , logbuf
970 #endif
971                             ) < 0) {
972 #if ((DEBUG) & DEBUG_I)
973                         pkts_lost_unpending++;
974 #endif                          
975                 }
976
977 #if ((DEBUG) & DEBUG_U)
978                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
979 #endif
980
981                 pending_tail->flags = 0;
982                 pending_tail = pending_tail->next;
983 #if ((DEBUG) & DEBUG_I)
984                 pkts_pending_done++;
985 #endif
986         }
987 }
988
989 void *scan_thread()
990 {
991 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
992         char logbuf[256];
993 #endif
994         int i;
995         struct Flow *flow, **flowpp;
996         struct Time now;
997         struct timespec timeout;
998
999         setuser();
1000
1001         timeout.tv_nsec = 0;
1002         pthread_mutex_lock(&scan_mutex);
1003
1004         for (;;) {
1005                 gettime(&now);
1006                 timeout.tv_sec = now.sec + scan_interval;
1007                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1008
1009                 gettime(&now);
1010 #if ((DEBUG) & DEBUG_S)
1011                 my_log(LOG_DEBUG, "S: %d", now.sec);
1012 #endif
1013                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1014                         pthread_mutex_lock(&flows_mutex[i]);
1015                         flow = flows[i];
1016                         flowpp = &flows[i];
1017                         while (flow) {
1018                                 if (flow->flags & FLOW_FRAG) {
1019                                         /* Process fragmented flow */
1020                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1021                                                 /* Fragmented flow expired - put it into special chain */
1022 #if ((DEBUG) & DEBUG_I)
1023                                                 flows_fragmented--;
1024                                                 flows_total--;
1025 #endif
1026                                                 *flowpp = flow->next;
1027                                                 flow->id = 0;
1028                                                 flow->flags &= ~FLOW_FRAG;
1029                                                 flow->next = scan_frag_dreg;
1030                                                 scan_frag_dreg = flow;
1031                                                 flow = *flowpp;
1032                                                 continue;
1033                                         }
1034                                 } else {
1035                                         /* Flow is not frgamented */
1036                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1037                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1038                                                 /* Flow expired */
1039 #if ((DEBUG) & DEBUG_S)
1040                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1041 #endif
1042 #if ((DEBUG) & DEBUG_I)
1043                                                 flows_total--;
1044 #endif
1045                                                 *flowpp = flow->next;
1046                                                 pthread_mutex_lock(&emit_mutex);
1047                                                 flow->next = flows_emit;
1048                                                 flows_emit = flow;
1049 #if ((DEBUG) & DEBUG_I)
1050                                                 emit_queue++;
1051 #endif                          
1052                                                 pthread_mutex_unlock(&emit_mutex);
1053                                                 flow = *flowpp;
1054                                                 continue;
1055                                         }
1056                                 }
1057                                 flowpp = &flow->next;
1058                                 flow = flow->next;
1059                         } /* chain loop */
1060                         pthread_mutex_unlock(&flows_mutex[i]);
1061                 } /* hash loop */
1062                 if (flows_emit) pthread_cond_signal(&emit_cond);
1063
1064                 while (scan_frag_dreg) {
1065                         flow = scan_frag_dreg;
1066                         scan_frag_dreg = flow->next;
1067 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1068                         *logbuf = 0;
1069 #endif
1070                         put_into(flow, MOVE_INTO
1071 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1072                                         , logbuf
1073 #endif
1074                                 );
1075 #if ((DEBUG) & DEBUG_S)
1076                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1077 #endif
1078                 }
1079         }
1080 }
1081
1082 void *cap_thread()
1083 {
1084         struct ulog_packet_msg *ulog_msg;
1085         struct ip *nl;
1086         void *tl;
1087         struct Flow *flow;
1088         int len, off_frag, psize;
1089 #if ((DEBUG) & DEBUG_C)
1090         char buf[64];
1091         char logbuf[256];
1092 #endif
1093         int challenge;
1094
1095         setuser();
1096
1097         while (!killed) {
1098                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1099                 if (len <= 0) {
1100                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1101                         continue;
1102                 }
1103                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1104
1105 #if ((DEBUG) & DEBUG_C)
1106                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1107 #endif
1108
1109                         nl = (void *) &ulog_msg->payload;
1110                         psize = ulog_msg->data_len;
1111
1112                         /* Sanity check */
1113                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1114 #if ((DEBUG) & DEBUG_C)
1115                                 strcat(logbuf, " U");
1116                                 my_log(LOG_DEBUG, "%s", logbuf);
1117 #endif
1118 #if ((DEBUG) & DEBUG_I)
1119                                 pkts_ignored++;
1120 #endif
1121                                 continue;
1122                         }
1123
1124                         if (pending_head->flags) {
1125 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1126                                 my_log(LOG_ERR,
1127 # if ((DEBUG) & DEBUG_C)
1128                                                 "%s %s %s", logbuf,
1129 # else
1130                                                 "%s %s",
1131 # endif
1132                                                 "pending queue full:", "packet lost");
1133 #endif
1134 #if ((DEBUG) & DEBUG_I)
1135                                 pkts_lost_capture++;
1136 #endif
1137                                 goto done;
1138                         }
1139
1140 #if ((DEBUG) & DEBUG_I)
1141                         pkts_total++;
1142 #endif
1143
1144                         flow = pending_head;
1145
1146                         /* ?FIXME? Add sanity check for ip_len? */
1147                         flow->size = ntohs(nl->ip_len);
1148 #if ((DEBUG) & DEBUG_I)
1149                         size_total += flow->size;
1150 #endif
1151
1152                         flow->sip = nl->ip_src;
1153                         flow->dip = nl->ip_dst;
1154                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1155                         
1156                         /* It's going to be expensive calling this syscall on every flow.
1157                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1158
1159                         flow->xid=0;
1160
1161                         if (ulog_msg->mark > 0) {
1162                 /* flow->xid is really the slice id :-/ */
1163                                 flow->xid = xid_to_slice_id(ulog_msg->mark);
1164                         }
1165
1166                         if (flow->xid < 1 || flow->xid!=challenge) 
1167                                 flow->xid = ulog_msg->mark;
1168
1169
1170                         if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1171                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid);
1172                         }
1173                         flow->iif = snmp_index(ulog_msg->indev_name);
1174                         flow->oif = snmp_index(ulog_msg->outdev_name);
1175                         flow->proto = nl->ip_p;
1176                         flow->id = 0;
1177                         flow->tcp_flags = 0;
1178                         flow->pkts = 1;
1179                         flow->sizeF = 0;
1180                         flow->sizeP = 0;
1181                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1182                         if (ulog_msg->timestamp_sec) {
1183                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1184                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1185                         } else gettime(&flow->ctime);
1186                         flow->mtime = flow->ctime;
1187
1188                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1189
1190                         /*
1191                            Offset (from network layer) to transport layer header/IP data
1192                            IOW IP header size ;-)
1193
1194                            ?FIXME?
1195                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1196                            */
1197                         off_tl = nl->ip_hl << 2;
1198                         tl = (void *) nl + off_tl;
1199
1200                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1201                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1202                         psize -= off_tl;
1203                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1204                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1205
1206                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1207                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1208 #if ((DEBUG) & DEBUG_C)
1209                                 strcat(logbuf, " F");
1210 #endif
1211 #if ((DEBUG) & DEBUG_I)
1212                                 pkts_total_fragmented++;
1213 #endif
1214                                 flow->flags |= FLOW_FRAG;
1215                                 flow->id = nl->ip_id;
1216
1217                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1218                                         /* Packet whith IP_MF contains information about whole datagram size */
1219                                         flow->flags |= FLOW_LASTFRAG;
1220                                         /* size = frag_offset*8 + data_size */
1221                                         flow->sizeP = off_frag + flow->sizeF;
1222                                 }
1223                         }
1224
1225 #if ((DEBUG) & DEBUG_C)
1226                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1227                         strcat(logbuf, buf);
1228                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1229                         strcat(logbuf, buf);
1230 #endif
1231
1232                         /*
1233                            Fortunately most interesting transport layer information fit
1234                            into first 8 bytes of IP data field (minimal nonzero size).
1235                            Thus we don't need actual packet reassembling to build whole
1236                            transport layer data. We only check the fragment offset for
1237                            zero value to find packet with this information.
1238                            */
1239                         if (!off_frag && psize >= 8) {
1240                                 switch (flow->proto) {
1241                                         case IPPROTO_TCP:
1242                                         case IPPROTO_UDP:
1243                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1244                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1245                                                 goto tl_known;
1246
1247 #ifdef ICMP_TRICK
1248                                         case IPPROTO_ICMP:
1249                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1250                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1251                                                 goto tl_known;
1252 #endif
1253 #ifdef ICMP_TRICK_CISCO
1254                                         case IPPROTO_ICMP:
1255                                                 flow->dp = *((int32_t *) tl);
1256                                                 goto tl_known;
1257 #endif
1258
1259                                         default:
1260                                                 /* Unknown transport layer */
1261 #if ((DEBUG) & DEBUG_C)
1262                                                 strcat(logbuf, " U");
1263 #endif
1264                                                 flow->sp = 0;
1265                                                 flow->dp = 0;
1266                                                 break;
1267
1268 tl_known:
1269 #if ((DEBUG) & DEBUG_C)
1270                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1271                                                 strcat(logbuf, buf);
1272 #endif
1273                                                 flow->flags |= FLOW_TL;
1274                                 }
1275                         }
1276
1277                         /* Check for tcp flags presence (including CWR and ECE). */
1278                         if (flow->proto == IPPROTO_TCP
1279                                         && off_frag < 16
1280                                         && psize >= 16 - off_frag) {
1281                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1282 #if ((DEBUG) & DEBUG_C)
1283                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1284                                 strcat(logbuf, buf);
1285 #endif
1286                         }
1287
1288 #if ((DEBUG) & DEBUG_C)
1289                         sprintf(buf, " => %x", (unsigned) flow);
1290                         strcat(logbuf, buf);
1291                         my_log(LOG_DEBUG, "%s", logbuf);
1292 #endif
1293
1294 #if ((DEBUG) & DEBUG_I)
1295                         pkts_pending++;
1296                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1297                         if (pending_queue_trace < pending_queue_trace_candidate)
1298                                 pending_queue_trace = pending_queue_trace_candidate;
1299 #endif
1300
1301                         /* Flow complete - inform unpending_thread() about it */
1302                         pending_head->flags |= FLOW_PENDING;
1303                         pending_head = pending_head->next;
1304 done:
1305                         pthread_cond_signal(&unpending_cond);
1306                 }
1307         }
1308         return 0;
1309 }
1310
1311 /* Copied out of CoDemux */
1312
1313 static int init_daemon() {
1314         pid_t pid;
1315         FILE *pidfile;
1316
1317         pidfile = fopen(PIDFILE, "w");
1318         if (pidfile == NULL) {
1319                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1320         }
1321
1322         if ((pid = fork()) < 0) {
1323                 fclose(pidfile);
1324                 my_log(LOG_ERR, "Could not fork!\n");
1325                 return(-1);
1326         }
1327         else if (pid != 0) {
1328                 /* i'm the parent, writing down the child pid  */
1329                 fprintf(pidfile, "%u\n", pid);
1330                 fclose(pidfile);
1331                 exit(0);
1332         }
1333
1334         /* close the pid file */
1335         fclose(pidfile);
1336
1337         /* routines for any daemon process
1338            1. create a new session 
1339            2. change directory to the root
1340            3. change the file creation permission 
1341            */
1342         setsid();
1343         chdir("/var/local/fprobe");
1344         umask(0);
1345
1346         return(0);
1347 }
1348
1349 int main(int argc, char **argv)
1350 {
1351         char errpbuf[512];
1352         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1353         int c, i, write_fd, memory_limit = 0;
1354         struct addrinfo hints, *res;
1355         struct sockaddr_in saddr;
1356         pthread_attr_t tattr;
1357         struct sigaction sigact;
1358         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1359         struct timeval timeout;
1360
1361         sched_min = sched_get_priority_min(SCHED);
1362         sched_max = sched_get_priority_max(SCHED);
1363
1364         memset(&saddr, 0 , sizeof(saddr));
1365         memset(&hints, 0 , sizeof(hints));
1366         hints.ai_flags = AI_PASSIVE;
1367         hints.ai_family = AF_INET;
1368         hints.ai_socktype = SOCK_DGRAM;
1369
1370         /* Process command line options */
1371
1372         opterr = 0;
1373         while ((c = my_getopt(argc, argv, parms)) != -1) {
1374                 switch (c) {
1375                         case '?':
1376                                 usage();
1377
1378                         case 'h':
1379                                 usage();
1380                 }
1381         }
1382
1383         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1384         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1385         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1386         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1387         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1388         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1389         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1390         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1391         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1392         if (parms[nflag].count) {
1393                 switch (atoi(parms[nflag].arg)) {
1394                         case 1:
1395                                 netflow = &NetFlow1;
1396                                 break;
1397
1398                         case 5:
1399                                 break;
1400
1401                         case 7:
1402                                 netflow = &NetFlow7;
1403                                 break;
1404
1405                         default:
1406                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1407                                 exit(1);
1408                 }
1409         }
1410         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1411         if (parms[lflag].count) {
1412                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1413                         *log_suffix++ = 0;
1414                         if (*log_suffix) {
1415                                 sprintf(errpbuf, "[%s]", log_suffix);
1416                                 strcat(ident, errpbuf);
1417                         }
1418                 }
1419                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1420                 if (log_suffix) *--log_suffix = ':';
1421         }
1422         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1423 err_malloc:
1424                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1425                 exit(1);
1426         }
1427         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1428         if (parms[qflag].count) {
1429                 pending_queue_length = atoi(parms[qflag].arg);
1430                 if (pending_queue_length < 1) {
1431                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1432                         exit(1);
1433                 }
1434         }
1435         if (parms[rflag].count) {
1436                 schedp.sched_priority = atoi(parms[rflag].arg);
1437                 if (schedp.sched_priority
1438                                 && (schedp.sched_priority < sched_min
1439                                         || schedp.sched_priority > sched_max)) {
1440                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1441                         exit(1);
1442                 }
1443         }
1444         if (parms[Bflag].count) {
1445                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1446         }
1447         if (parms[bflag].count) {
1448                 bulk_quantity = atoi(parms[bflag].arg);
1449                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1450                         fprintf(stderr, "Illegal %s\n", "bulk size");
1451                         exit(1);
1452                 }
1453         }
1454         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1455         if (parms[Xflag].count) {
1456                 for(i = 0; parms[Xflag].arg[i]; i++)
1457                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1458                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1459                         goto err_malloc;
1460                 rule = strtok(parms[Xflag].arg, ":");
1461                 for (i = 0; rule; i++) {
1462                         snmp_rules[i].len = strlen(rule);
1463                         if (snmp_rules[i].len > IFNAMSIZ) {
1464                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1465                                 exit(1);
1466                         }
1467                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1468                         if (!*(rule - 1)) *(rule - 1) = ',';
1469                         rule = strtok(NULL, ",");
1470                         if (!rule) {
1471                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1472                                 exit(1);
1473                         }
1474                         snmp_rules[i].base = atoi(rule);
1475                         *(rule - 1) = ':';
1476                         rule = strtok(NULL, ":");
1477                 }
1478                 nsnmp_rules = i;
1479         }
1480         if (parms[tflag].count)
1481                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1482         if (parms[aflag].count) {
1483                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1484 bad_lhost:
1485                         fprintf(stderr, "Illegal %s\n", "source address");
1486                         exit(1);
1487                 } else {
1488                         saddr = *((struct sockaddr_in *) res->ai_addr);
1489                         freeaddrinfo(res);
1490                 }
1491         }
1492         if (parms[uflag].count) 
1493                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1494                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1495                         exit(1);
1496                 }
1497
1498
1499         /* Process collectors parameters. Brrrr... :-[ */
1500
1501         npeers = argc - optind;
1502         if (npeers > 1) {
1503                 /* Send to remote Netflow collector */
1504                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1505                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1506                         dhost = argv[i];
1507                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1508                         *dport++ = 0;
1509                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1510                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1511                                 exit(1);
1512                         }
1513                         peers[npeers].write_fd = write_fd;
1514                         peers[npeers].type = PEER_MIRROR;
1515                         peers[npeers].laddr = saddr;
1516                         peers[npeers].seq = 0;
1517                         if ((lhost = strchr(dport, '/'))) {
1518                                 *lhost++ = 0;
1519                                 if ((type = strchr(lhost, '/'))) {
1520                                         *type++ = 0;
1521                                         switch (*type) {
1522                                                 case 0:
1523                                                 case 'm':
1524                                                         break;
1525
1526                                                 case 'r':
1527                                                         peers[npeers].type = PEER_ROTATE;
1528                                                         npeers_rot++;
1529                                                         break;
1530
1531                                                 default:
1532                                                         goto bad_collector;
1533                                         }
1534                                 }
1535                                 if (*lhost) {
1536                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1537                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1538                                         freeaddrinfo(res);
1539                                 }
1540                         }
1541                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1542                                                 sizeof(struct sockaddr_in))) {
1543                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1544                                 exit(1);
1545                         }
1546                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1547 bad_collector:
1548                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1549                                 exit(1);
1550                         }
1551                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1552                         freeaddrinfo(res);
1553                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1554                                                 sizeof(struct sockaddr_in))) {
1555                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1556                                 exit(1);
1557                         }
1558
1559                         /* Restore command line */
1560                         if (type) *--type = '/';
1561                         if (lhost) *--lhost = '/';
1562                         *--dport = ':';
1563                 }
1564         }
1565         else if (parms[fflag].count) {
1566                 // log into a file
1567                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1568                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1569                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1570
1571                 peers[npeers].write_fd = START_DATA_FD;
1572                 peers[npeers].type = PEER_FILE;
1573                 peers[npeers].seq = 0;
1574
1575                 read_cur_epoch();
1576                 npeers++;
1577         }
1578         else 
1579                 usage();
1580
1581
1582         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1583         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1584         if (!ulog_handle) {
1585                 fprintf(stderr, "libipulog initialization error: %s",
1586                                 ipulog_strerror(ipulog_errno));
1587                 exit(1);
1588         }
1589         if (sockbufsize)
1590                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1591                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1592                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1593
1594         /* Daemonize (if log destination stdout-free) */
1595
1596         my_log_open(ident, verbosity, log_dest);
1597
1598         init_daemon();
1599
1600         if (!(log_dest & 2)) {
1601                 /* Crash-proofing - Sapan*/
1602                 while (1) {
1603                         int pid=fork();
1604                         if (pid==-1) {
1605                                 fprintf(stderr, "fork(): %s", strerror(errno));
1606                                 exit(1);
1607                         }
1608                         else if (pid==0) {
1609                                 setsid();
1610                                 freopen("/dev/null", "r", stdin);
1611                                 freopen("/dev/null", "w", stdout);
1612                                 freopen("/dev/null", "w", stderr);
1613                                 break;
1614                         }
1615                         else {
1616                                 while (wait3(NULL,0,NULL) < 1);
1617                         }
1618                 }
1619         } else {
1620                 setvbuf(stdout, (char *)0, _IONBF, 0);
1621                 setvbuf(stderr, (char *)0, _IONBF, 0);
1622         }
1623
1624         pid = getpid();
1625         sprintf(errpbuf, "[%ld]", (long) pid);
1626         strcat(ident, errpbuf);
1627
1628         /* Initialization */
1629
1630         hash_init(); /* Actually for crc16 only */
1631         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1632         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1633
1634 #ifdef UPTIME_TRICK
1635         /* Hope 12 days is enough :-/ */
1636         start_time_offset = 1 << 20;
1637
1638         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1639 #endif
1640         gettime(&start_time);
1641
1642         /*
1643            Build static pending queue as circular buffer.
1644            */
1645         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1646         pending_tail = pending_head;
1647         for (i = pending_queue_length - 1; i--;) {
1648                 if (!(pending_tail->next = mem_alloc())) {
1649 err_mem_alloc:
1650                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1651                         exit(1);
1652                 }
1653                 pending_tail = pending_tail->next;
1654         }
1655         pending_tail->next = pending_head;
1656         pending_tail = pending_head;
1657
1658         sigemptyset(&sig_mask);
1659         sigact.sa_handler = &sighandler;
1660         sigact.sa_mask = sig_mask;
1661         sigact.sa_flags = 0;
1662         sigaddset(&sig_mask, SIGTERM);
1663         sigaction(SIGTERM, &sigact, 0);
1664 #if ((DEBUG) & DEBUG_I)
1665         sigaddset(&sig_mask, SIGUSR1);
1666         sigaction(SIGUSR1, &sigact, 0);
1667 #endif
1668         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1669                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1670                 exit(1);
1671         }
1672
1673         my_log(LOG_INFO, "Starting %s...", VERSION);
1674
1675         if (parms[cflag].count) {
1676                 if (chdir(parms[cflag].arg) || chroot(".")) {
1677                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1678                         exit(1);
1679                 }
1680         }
1681
1682         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1683         pthread_attr_init(&tattr);
1684         for (i = 0; i < THREADS - 1; i++) {
1685                 if (schedp.sched_priority > 0) {
1686                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1687                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1688                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1689                                 exit(1);
1690                         }
1691                 }
1692                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1693                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1694                         exit(1);
1695                 }
1696                 pthread_detach(thid);
1697                 schedp.sched_priority++;
1698         }
1699
1700         if (pw) {
1701                 if (setgroups(0, NULL)) {
1702                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1703                         exit(1);
1704                 }
1705                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1706                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1707                         exit(1);
1708                 }
1709                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1710                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1711                         exit(1);
1712                 }
1713         }
1714
1715         if (!(pidfile = fopen(pidfilepath, "w")))
1716                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1717         else {
1718                 fprintf(pidfile, "%ld\n", (long) pid);
1719                 fclose(pidfile);
1720         }
1721
1722         my_log(LOG_INFO, "pid: %d", pid);
1723         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1724                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1725                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1726                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1727                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1728                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1729                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1730         for (i = 0; i < nsnmp_rules; i++) {
1731                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1732                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1733         }
1734         for (i = 0; i < npeers; i++) {
1735                 switch (peers[i].type) {
1736                         case PEER_MIRROR:
1737                                 c = 'm';
1738                                 break;
1739                         case PEER_ROTATE:
1740                                 c = 'r';
1741                                 break;
1742                 }
1743                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1744                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1745                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1746         }
1747
1748         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1749
1750         timeout.tv_usec = 0;
1751         while (!killed
1752                         || (total_elements - free_elements - pending_queue_length)
1753                         || emit_count
1754                         || pending_tail->flags) {
1755
1756                 if (!sigs) {
1757                         timeout.tv_sec = scan_interval;
1758                         select(0, 0, 0, 0, &timeout);
1759                 }
1760
1761                 if (sigs & SIGTERM_MASK && !killed) {
1762                         sigs &= ~SIGTERM_MASK;
1763                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1764                         scan_interval = 1;
1765                         frag_lifetime = -1;
1766                         active_lifetime = -1;
1767                         inactive_lifetime = -1;
1768                         emit_timeout = 1;
1769                         unpending_timeout = 1;
1770                         killed = 1;
1771                         pthread_cond_signal(&scan_cond);
1772                         pthread_cond_signal(&unpending_cond);
1773                 }
1774
1775 #if ((DEBUG) & DEBUG_I)
1776                 if (sigs & SIGUSR1_MASK) {
1777                         sigs &= ~SIGUSR1_MASK;
1778                         info_debug();
1779                 }
1780 #endif
1781         }
1782         remove(pidfilepath);
1783 #if ((DEBUG) & DEBUG_I)
1784         info_debug();
1785 #endif
1786         my_log(LOG_INFO, "Done.");
1787 #ifdef WALL
1788         return 0;
1789 #endif
1790 }