Cleaned the code up, replacing references to xid --> slice_id
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, slice_id support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 #include "vserver.h"
41
42 struct ipulog_handle {
43         int fd;
44         u_int8_t blocking;
45         struct sockaddr_nl local;
46         struct sockaddr_nl peer;
47         struct nlmsghdr* last_nlhdr;
48 };
49
50 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
51 #include <sys/types.h>
52 #include <netinet/in_systm.h>
53 #include <sys/socket.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <netinet/ip.h>
57 #include <netinet/tcp.h>
58 #include <netinet/udp.h>
59 #include <netinet/ip_icmp.h>
60 #include <net/if.h>
61
62 #include <sys/param.h>
63 #include <pwd.h>
64 #ifdef OS_LINUX
65 #include <grp.h>
66 #endif
67
68 /* pthread_*() */
69 #include <pthread.h>
70
71 /* errno */
72 #include <errno.h>
73
74 /* getaddrinfo() */
75 #include <netdb.h>
76
77 /* nanosleep() */
78 #include <time.h>
79
80 /* gettimeofday() */
81 #include <sys/time.h>
82
83 /* scheduling */
84 #include <sched.h>
85
86 /* select() (POSIX)*/
87 #include <sys/select.h>
88
89 /* open() */
90 #include <sys/stat.h>
91 #include <fcntl.h>
92
93 #include <fprobe-ulog.h>
94 #include <my_log.h>
95 #include <my_getopt.h>
96 #include <netflow.h>
97 #include <hash.h>
98 #include <mem.h>
99
100 #define PIDFILE                 "/var/log/fprobe-ulog.pid"
101 #define LAST_EPOCH_FILE         "/var/log/fprobe_last_epoch"
102 #define MAX_EPOCH_SIZE          sizeof("32767")
103 #define STD_NETFLOW_PDU
104
105 enum {
106         aflag,
107         Bflag,
108         bflag,
109         cflag,
110         dflag,
111         Dflag,
112         eflag,
113         Eflag,
114         fflag,
115         gflag,
116         hflag,
117         lflag,
118         mflag,
119         Mflag,
120         nflag,
121         qflag,
122         rflag,
123         sflag,
124         tflag,
125         Tflag,
126         Uflag,
127         uflag,
128         vflag,
129         Wflag,
130         Xflag,
131 };
132
133 static struct getopt_parms parms[] = {
134         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'h', 0, 0, 0},
145         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'M', 0, 0, 0},
148         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
155         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
156         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
157         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
158         {0, 0, 0, 0}
159 };
160
161 extern char *optarg;
162 extern int optind, opterr, optopt;
163 extern int errno;
164
165 extern struct NetFlow NetFlow1;
166 extern struct NetFlow NetFlow5;
167 extern struct NetFlow NetFlow7;
168
169 #define START_DATA_FD -5
170 #define mark_is_tos parms[Mflag].count
171 static unsigned scan_interval = 5;
172 static unsigned int min_free = 0;
173 static int frag_lifetime = 30;
174 static int inactive_lifetime = 60;
175 static int active_lifetime = 300;
176 static int sockbufsize;
177 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
178 #if (MEM_BITS == 0) || (MEM_BITS == 16)
179 #define BULK_QUANTITY 10000
180 #else
181 #define BULK_QUANTITY 200
182 #endif
183
184 static unsigned epoch_length=60, log_epochs=1; 
185 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
186
187 static unsigned bulk_quantity = BULK_QUANTITY;
188 static unsigned pending_queue_length = 100;
189 static struct NetFlow *netflow = &NetFlow5;
190 static unsigned verbosity = 6;
191 static unsigned log_dest = MY_LOG_SYSLOG;
192 static struct Time start_time;
193 static long start_time_offset;
194 static int off_tl;
195 /* From mem.c */
196 extern unsigned total_elements;
197 extern unsigned free_elements;
198 extern unsigned total_memory;
199 #if ((DEBUG) & DEBUG_I)
200 static unsigned emit_pkts, emit_queue;
201 static uint64_t size_total;
202 static unsigned pkts_total, pkts_total_fragmented;
203 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
204 static unsigned pkts_pending, pkts_pending_done;
205 static unsigned pending_queue_trace, pending_queue_trace_candidate;
206 static unsigned flows_total, flows_fragmented;
207 #endif
208 static unsigned emit_count;
209 static uint32_t emit_sequence;
210 static unsigned emit_rate_bytes, emit_rate_delay;
211 static struct Time emit_time;
212 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
213 static pthread_t thid;
214 static sigset_t sig_mask;
215 static struct sched_param schedp;
216 static int sched_min, sched_max;
217 static int npeers, npeers_rot;
218 static struct peer *peers;
219 static int sigs;
220
221 static struct Flow *flows[1 << HASH_BITS];
222 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
223
224 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
225 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *pending_head, *pending_tail;
230 static struct Flow *scan_frag_dreg;
231
232 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
234 static struct Flow *flows_emit;
235
236 static char ident[256] = "fprobe-ulog";
237 static FILE *pidfile;
238 static char *pidfilepath;
239 static pid_t pid;
240 static int killed;
241 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
242 static struct ipulog_handle *ulog_handle;
243 static uint32_t ulog_gmask = 1;
244 static char *cap_buf;
245 static int nsnmp_rules;
246 static struct snmp_rule *snmp_rules;
247 static struct passwd *pw = 0;
248
249 void usage()
250 {
251         fprintf(stdout,
252                         "fprobe-ulog: a NetFlow probe. Version %s\n"
253                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
254                         "\n"
255                         "-h\t\tDisplay this help\n"
256                         "-U <mask>\tULOG group bitwise mask [1]\n"
257                         "-s <seconds>\tHow often scan for expired flows [5]\n"
258                         "-g <seconds>\tFragmented flow lifetime [30]\n"
259                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
260                         "-f <filename>\tLog flow data in a file\n"
261                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
262                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
263                         "-a <address>\tUse <address> as source for NetFlow flow\n"
264                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
265                         "-M\t\tUse netfilter mark value as ToS flag\n"
266                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
267                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
268                         "-q <flows>\tPending queue length [100]\n"
269                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
270                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
271                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
272                         "-c <directory>\tDirectory to chroot to\n"
273                         "-u <user>\tUser to run as\n"
274                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
275                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
276                         "-y <remote:port>\tAddress of the NetFlow collector\n"
277                         "-f <writable file>\tFile to write data into\n"
278                         "-T <n>\tRotate log file every n epochs\n"
279                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
280                         "-E <[1..60]>\tSize of an epoch in minutes\n"
281                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
282                         ,
283                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
284         exit(0);
285 }
286
287 #if ((DEBUG) & DEBUG_I)
288 void info_debug()
289 {
290         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
291                         pkts_total, pkts_total_fragmented, size_total,
292                         pkts_pending - pkts_pending_done, pending_queue_trace);
293         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
294                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
295         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
296                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
297         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
298                         total_elements, free_elements, total_memory);
299 }
300 #endif
301
302 void sighandler(int sig)
303 {
304         switch (sig) {
305                 case SIGTERM:
306                         sigs |= SIGTERM_MASK;
307                         break;
308 #if ((DEBUG) & DEBUG_I)
309                 case SIGUSR1:
310                         sigs |= SIGUSR1_MASK;
311                         break;
312 #endif
313         }
314 }
315
316 void gettime(struct Time *now)
317 {
318         struct timeval t;
319
320         gettimeofday(&t, 0);
321         now->sec = t.tv_sec;
322         now->usec = t.tv_usec;
323 }
324
325
326 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
327 {
328         return (t1->sec - t2->sec)/60;
329 }
330
331 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
332 {
333         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
334 }
335
336 /* Uptime in miliseconds */
337 uint32_t getuptime(struct Time *t)
338 {
339         /* Maximum uptime is about 49/2 days */
340         return cmpmtime(t, &start_time);
341 }
342
343 /* Uptime in minutes */
344 uint32_t getuptime_minutes(struct Time *t)
345 {
346         /* Maximum uptime is about 49/2 days */
347         return cmpMtime(t, &start_time);
348 }
349
350 hash_t hash_flow(struct Flow *flow)
351 {
352         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
353         else return hash(flow, sizeof(struct Flow_TL));
354 }
355
356 uint16_t snmp_index(char *name) {
357         uint32_t i;
358
359         if (!*name) return 0;
360
361         for (i = 0; (int) i < nsnmp_rules; i++) {
362                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
363                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
364         }
365
366         if ((i = if_nametoindex(name))) return i;
367
368         return -1;
369 }
370
371 inline void copy_flow(struct Flow *src, struct Flow *dst)
372 {
373         dst->iif = src->iif;
374         dst->oif = src->oif;
375         dst->sip = src->sip;
376         dst->dip = src->dip;
377         dst->tos = src->tos;
378         dst->slice_id = src->slice_id;
379         dst->proto = src->proto;
380         dst->tcp_flags = src->tcp_flags;
381         dst->id = src->id;
382         dst->sp = src->sp;
383         dst->dp = src->dp;
384         dst->pkts = src->pkts;
385         dst->size = src->size;
386         dst->sizeF = src->sizeF;
387         dst->sizeP = src->sizeP;
388         dst->ctime = src->ctime;
389         dst->mtime = src->mtime;
390         dst->flags = src->flags;
391 }
392
393 void read_cur_epoch() {
394         int fd;
395         /* Reset to -1 in case the read fails */
396         cur_epoch=-1;
397         fd = open(LAST_EPOCH_FILE, O_RDONLY);
398         if (fd != -1) {
399                 char snum[MAX_EPOCH_SIZE];
400                 ssize_t len;
401                 len = read(fd, snum, MAX_EPOCH_SIZE-1);
402                 if (len != -1) {
403                         snum[len]='\0';
404                         sscanf(snum,"%d",&cur_epoch);
405                         cur_epoch++; /* Let's not stone the last epoch */
406                         close(fd);
407                 }
408         }
409         return;
410 }
411
412
413 /* Dumps the current epoch in a file to cope with
414  * reboots and killings of fprobe */
415
416 void update_cur_epoch_file(int n) {
417         int fd, len;
418         char snum[MAX_EPOCH_SIZE];
419         len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
420         fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
421         if (fd == -1) {
422                 my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
423                 return;
424         }
425         write(fd, snum, len);
426         close(fd);
427 }
428
429 /* Get the file descriptor corresponding to the current file.
430  * The kludgy implementation is to abstract away the 'current
431  * file descriptor', which may also be a socket. 
432  */
433
434 unsigned get_data_file_fd(char *fname, int cur_fd) {
435         struct Time now;
436         unsigned cur_uptime;
437
438         struct statfs statfs;
439         int ret_fd;
440
441         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
442          * doesn't solve the problem */
443         gettime(&now);
444         cur_uptime = getuptime_minutes(&now);
445
446         if (cur_fd != START_DATA_FD) {
447                 if (fstatfs(cur_fd, &statfs) == -1) {
448                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
449                 }
450                 else {
451                         if (min_free && (statfs.f_bavail < min_free)
452                                         && (cur_epoch==last_peak))
453                         {
454                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
455                                 cur_epoch = -1;
456                         }
457                         /*
458                            else 
459                            assume that we can reclaim space by overwriting our own files
460                            and that the difference in size will not fill the disk - sapan
461                            */
462                 }
463         }
464
465         /* If epoch length has been exceeded, 
466          * or we're starting up 
467          * or we're going back to the first epoch */
468         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
469                 char nextname[MAX_PATH_LEN];
470                 int write_fd;
471                 prev_uptime = cur_uptime;
472                 cur_epoch = (cur_epoch + 1) % log_epochs;
473                 if (cur_epoch>last_peak) last_peak = cur_epoch;
474                 if (cur_fd>0)
475                         close(cur_fd);
476                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
477                 if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
478                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
479                         exit(1);
480                 }
481                 if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
482                         my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno));
483                 }
484                 update_cur_epoch_file(cur_epoch);
485                 ret_fd = write_fd;
486         }
487         else {
488                 ret_fd = cur_fd;
489         }
490         return(ret_fd);
491 }
492
493 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
494 {
495         struct Flow **flowpp;
496
497 #ifdef WALL
498         flowpp = 0;
499 #endif
500
501         if (prev) flowpp = *prev;
502
503         while (where) {
504                 if (where->sip.s_addr == what->sip.s_addr
505                                 && where->dip.s_addr == what->dip.s_addr
506                                 && where->proto == what->proto) {
507                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
508                                 case 0:
509                                         /* Both unfragmented */
510                                         if ((what->sp == where->sp)
511                                                         && (what->dp == where->dp)) goto done;
512                                         break;
513                                 case 2:
514                                         /* Both fragmented */
515                                         if (where->id == what->id) goto done;
516                                         break;
517                         }
518                 }
519                 flowpp = &where->next;
520                 where = where->next;
521         }
522 done:
523         if (prev) *prev = flowpp;
524         return where;
525 }
526
527         int put_into(struct Flow *flow, int flag
528 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
529                         , char *logbuf
530 #endif
531                     )
532 {
533         int ret = 0;
534         hash_t h;
535         struct Flow *flown, **flowpp;
536 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
537         char buf[64];
538 #endif
539
540         h = hash_flow(flow);
541 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
542         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
543         strcat(logbuf, buf);
544 #endif
545         pthread_mutex_lock(&flows_mutex[h]);
546         flowpp = &flows[h];
547         if (!(flown = find(flows[h], flow, &flowpp))) {
548                 /* No suitable flow found - add */
549                 if (flag == COPY_INTO) {
550                         if ((flown = mem_alloc())) {
551                                 copy_flow(flow, flown);
552                                 flow = flown;
553                         } else {
554 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
555                                 my_log(LOG_ERR, "%s %s. %s",
556                                                 "mem_alloc():", strerror(errno), "packet lost");
557 #endif
558                                 return -1;
559                         }
560                 }
561                 flow->next = flows[h];
562                 flows[h] = flow;
563 #if ((DEBUG) & DEBUG_I)
564                 flows_total++;
565                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
566 #endif
567 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
568                 if (flown) {
569                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
570                         strcat(logbuf, buf);
571                 }
572 #endif
573         } else {
574                 /* Found suitable flow - update */
575 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
576                 sprintf(buf, " +> %x", (unsigned) flown);
577                 strcat(logbuf, buf);
578 #endif
579                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
580                         flown->mtime = flow->mtime;
581                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
582                         flown->ctime = flow->ctime;
583                 flown->tcp_flags |= flow->tcp_flags;
584                 flown->size += flow->size;
585                 flown->pkts += flow->pkts;
586
587                 /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
588                  * if a better value comes along. A good example of this is that by the time CoDemux sets the
589                  * peercred of a flow, it has already been accounted for here and attributed to root. */
590
591                 if (flown->slice_id<1)
592                                 flown->slice_id = flow->slice_id;
593
594
595                 if (flow->flags & FLOW_FRAG) {
596                         /* Fragmented flow require some additional work */
597                         if (flow->flags & FLOW_TL) {
598                                 /*
599                                    ?FIXME?
600                                    Several packets with FLOW_TL (attack)
601                                    */
602                                 flown->sp = flow->sp;
603                                 flown->dp = flow->dp;
604                         }
605                         if (flow->flags & FLOW_LASTFRAG) {
606                                 /*
607                                    ?FIXME?
608                                    Several packets with FLOW_LASTFRAG (attack)
609                                    */
610                                 flown->sizeP = flow->sizeP;
611                         }
612                         flown->flags |= flow->flags;
613                         flown->sizeF += flow->sizeF;
614                         if ((flown->flags & FLOW_LASTFRAG)
615                                         && (flown->sizeF >= flown->sizeP)) {
616                                 /* All fragments received - flow reassembled */
617                                 *flowpp = flown->next;
618                                 pthread_mutex_unlock(&flows_mutex[h]);
619 #if ((DEBUG) & DEBUG_I)
620                                 flows_total--;
621                                 flows_fragmented--;
622 #endif
623                                 flown->id = 0;
624                                 flown->flags &= ~FLOW_FRAG;
625 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
626                                 strcat(logbuf," R");
627 #endif
628                                 ret = put_into(flown, MOVE_INTO
629 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
630                                                 , logbuf
631 #endif
632                                               );
633                         }
634                 }
635                 if (flag == MOVE_INTO) mem_free(flow);
636         }
637         pthread_mutex_unlock(&flows_mutex[h]);
638         return ret;
639 }
640
641 int onlyonce=0;
642
643 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
644 {
645         int i;
646
647         for (i = 0; i < fields; i++) {
648 #if ((DEBUG) & DEBUG_F)
649                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
650 #endif
651                 switch (format[i]) {
652                         case NETFLOW_IPV4_SRC_ADDR:
653                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
654                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
655                                 break;
656
657                         case NETFLOW_IPV4_DST_ADDR:
658                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
659                                 if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
660                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
661                                 }
662                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
663                                 break;
664
665                         case NETFLOW_INPUT_SNMP:
666                                 *((uint16_t *) p) = htons(flow->iif);
667                                 p += NETFLOW_INPUT_SNMP_SIZE;
668                                 break;
669
670                         case NETFLOW_OUTPUT_SNMP:
671                                 *((uint16_t *) p) = htons(flow->oif);
672                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
673                                 break;
674
675                         case NETFLOW_PKTS_32:
676                                 *((uint32_t *) p) = htonl(flow->pkts);
677                                 p += NETFLOW_PKTS_32_SIZE;
678                                 break;
679
680                         case NETFLOW_BYTES_32:
681                                 *((uint32_t *) p) = htonl(flow->size);
682                                 p += NETFLOW_BYTES_32_SIZE;
683                                 break;
684
685                         case NETFLOW_FIRST_SWITCHED:
686                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
687                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
688                                 break;
689
690                         case NETFLOW_LAST_SWITCHED:
691                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
692                                 p += NETFLOW_LAST_SWITCHED_SIZE;
693                                 break;
694
695                         case NETFLOW_L4_SRC_PORT:
696                                 *((uint16_t *) p) = flow->sp;
697                                 p += NETFLOW_L4_SRC_PORT_SIZE;
698                                 break;
699
700                         case NETFLOW_L4_DST_PORT:
701                                 *((uint16_t *) p) = flow->dp;
702                                 p += NETFLOW_L4_DST_PORT_SIZE;
703                                 break;
704
705                         case NETFLOW_PROT:
706                                 *((uint8_t *) p) = flow->proto;
707                                 p += NETFLOW_PROT_SIZE;
708                                 break;
709
710                         case NETFLOW_SRC_TOS:
711                                 *((uint8_t *) p) = flow->tos;
712                                 p += NETFLOW_SRC_TOS_SIZE;
713                                 break;
714
715                         case NETFLOW_TCP_FLAGS:
716                                 *((uint8_t *) p) = flow->tcp_flags;
717                                 p += NETFLOW_TCP_FLAGS_SIZE;
718                                 break;
719
720                         case NETFLOW_VERSION:
721                                 *((uint16_t *) p) = htons(netflow->Version);
722                                 p += NETFLOW_VERSION_SIZE;
723                                 break;
724
725                         case NETFLOW_COUNT:
726                                 *((uint16_t *) p) = htons(emit_count);
727                                 p += NETFLOW_COUNT_SIZE;
728                                 break;
729
730                         case NETFLOW_UPTIME:
731                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
732                                 p += NETFLOW_UPTIME_SIZE;
733                                 break;
734
735                         case NETFLOW_UNIX_SECS:
736                                 *((uint32_t *) p) = htonl(emit_time.sec);
737                                 p += NETFLOW_UNIX_SECS_SIZE;
738                                 break;
739
740                         case NETFLOW_UNIX_NSECS:
741                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
742                                 p += NETFLOW_UNIX_NSECS_SIZE;
743                                 break;
744
745                         case NETFLOW_FLOW_SEQUENCE:
746                                 //*((uint32_t *) p) = htonl(emit_sequence);
747                                 *((uint32_t *) p) = 0;
748                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
749                                 break;
750
751                         case NETFLOW_PAD8:
752                                 /* Unsupported (uint8_t) */
753                         case NETFLOW_ENGINE_TYPE:
754                         case NETFLOW_ENGINE_ID:
755                         case NETFLOW_FLAGS7_1:
756                         case NETFLOW_SRC_MASK:
757                         case NETFLOW_DST_MASK:
758                                 if (onlyonce) {
759                                         my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
760                                         onlyonce=1;
761                                 }
762                                 *((uint8_t *) p) = 0;
763                                 p += NETFLOW_PAD8_SIZE;
764                                 break;
765                         case NETFLOW_SLICE_ID:
766                                 *((uint32_t *) p) = flow->slice_id;
767                                 p += NETFLOW_SLICE_ID_SIZE;
768                                 break;
769                         case NETFLOW_PAD16:
770                                 /* Unsupported (uint16_t) */
771                         case NETFLOW_SRC_AS:
772                         case NETFLOW_DST_AS:
773                         case NETFLOW_FLAGS7_2:
774                                 *((uint16_t *) p) = 0;
775                                 p += NETFLOW_PAD16_SIZE;
776                                 break;
777
778                         case NETFLOW_PAD32:
779                                 /* Unsupported (uint32_t) */
780                         case NETFLOW_IPV4_NEXT_HOP:
781                         case NETFLOW_ROUTER_SC:
782                                 *((uint32_t *) p) = 0;
783                                 p += NETFLOW_PAD32_SIZE;
784                                 break;
785
786                         default:
787                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
788                                                 format, i, format[i]);
789                                 exit(1);
790                 }
791         }
792 #if ((DEBUG) & DEBUG_F)
793         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
794 #endif
795         return p;
796 }
797
798 void setuser() {
799         /*
800            Workaround for clone()-based threads
801            Try to change EUID independently of main thread
802            */
803         if (pw) {
804                 setgroups(0, NULL);
805                 setregid(pw->pw_gid, pw->pw_gid);
806                 setreuid(pw->pw_uid, pw->pw_uid);
807         }
808 }
809
810 void *emit_thread()
811 {
812         struct Flow *flow;
813         void *p;
814         struct timeval now;
815         struct timespec timeout;
816         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
817
818         p = (void *) &emit_packet + netflow->HeaderSize;
819         timeout.tv_nsec = 0;
820
821         setuser();
822
823         for (;;) {
824                 pthread_mutex_lock(&emit_mutex);
825                 while (!flows_emit) {
826                         gettimeofday(&now, 0);
827                         timeout.tv_sec = now.tv_sec + emit_timeout;
828                         /* Do not wait until emit_packet will filled - it may be too long */
829                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
830                                 pthread_mutex_unlock(&emit_mutex);
831                                 goto sendit;
832                         }
833                 }
834                 flow = flows_emit;
835                 flows_emit = flows_emit->next;
836 #if ((DEBUG) & DEBUG_I)
837                 emit_queue--;
838 #endif          
839                 pthread_mutex_unlock(&emit_mutex);
840
841 #ifdef UPTIME_TRICK
842                 if (!emit_count) {
843                         gettime(&start_time);
844                         start_time.sec -= start_time_offset;
845                 }
846 #endif
847                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
848                 mem_free(flow);
849                 emit_count++;
850 #ifdef PF2_DEBUG
851                 printf("Emit count = %d\n", emit_count);
852                 fflush(stdout);
853 #endif
854                 if (emit_count == netflow->MaxFlows) {
855 sendit:
856                         gettime(&emit_time);
857                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
858                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
859                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
860 #ifdef STD_NETFLOW_PDU
861                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
862 #endif
863                         peer_rot_cur = 0;
864                         for (i = 0; i < npeers; i++) {
865                                 if (peers[i].type == PEER_FILE) {
866                                         if (netflow->SeqOffset)
867                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
868                                         peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
869                                         ret = write(peers[i].write_fd, emit_packet, size);
870                                         if (ret < size) {
871
872 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
873                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
874                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
875 #endif
876 #undef MESSAGES
877                                         }
878 #if ((DEBUG) & DEBUG_E)
879                                         commaneelse {
880                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
881                                                                 emit_count, i + 1, peers[i].seq);
882                                         }
883 #endif
884                                         peers[i].seq += emit_count;
885
886                                         /* Rate limit */
887                                         if (emit_rate_bytes) {
888                                                 sent += size;
889                                                 delay = sent / emit_rate_bytes;
890                                                 if (delay) {
891                                                         sent %= emit_rate_bytes;
892                                                         timeout.tv_sec = 0;
893                                                         timeout.tv_nsec = emit_rate_delay * delay;
894                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
895                                                 }
896                                         }
897                                 }
898                                 else
899                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
900                                         else
901                                                 if (peers[i].type == PEER_ROTATE) 
902                                                         if (peer_rot_cur++ == peer_rot_work) {
903 sendreal:
904                                                                 if (netflow->SeqOffset)
905                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
906                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
907                                                                 if (ret < size) {
908 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
909                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
910                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
911 #endif
912                                                                 }
913 #if ((DEBUG) & DEBUG_E)
914                                                                 commaneelse {
915                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
916                                                                                         emit_count, i + 1, peers[i].seq);
917                                                                 }
918 #endif
919                                                                 peers[i].seq += emit_count;
920
921                                                                 /* Rate limit */
922                                                                 if (emit_rate_bytes) {
923                                                                         sent += size;
924                                                                         delay = sent / emit_rate_bytes;
925                                                                         if (delay) {
926                                                                                 sent %= emit_rate_bytes;
927                                                                                 timeout.tv_sec = 0;
928                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
929                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
930                                                                         }
931                                                                 }
932                                                         }
933                         }
934                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
935                         emit_sequence += emit_count;
936                         emit_count = 0;
937 #if ((DEBUG) & DEBUG_I)
938                         emit_pkts++;
939 #endif
940                 }
941         }
942 }       
943
944 void *unpending_thread()
945 {
946         struct timeval now;
947         struct timespec timeout;
948 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
949         char logbuf[256];
950 #endif
951
952         setuser();
953
954         timeout.tv_nsec = 0;
955         pthread_mutex_lock(&unpending_mutex);
956
957         for (;;) {
958                 while (!(pending_tail->flags & FLOW_PENDING)) {
959                         gettimeofday(&now, 0);
960                         timeout.tv_sec = now.tv_sec + unpending_timeout;
961                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
962                 }
963
964 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
965                 *logbuf = 0;
966 #endif
967                 if (put_into(pending_tail, COPY_INTO
968 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
969                                         , logbuf
970 #endif
971                             ) < 0) {
972 #if ((DEBUG) & DEBUG_I)
973                         pkts_lost_unpending++;
974 #endif                          
975                 }
976
977 #if ((DEBUG) & DEBUG_U)
978                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
979 #endif
980
981                 pending_tail->flags = 0;
982                 pending_tail = pending_tail->next;
983 #if ((DEBUG) & DEBUG_I)
984                 pkts_pending_done++;
985 #endif
986         }
987 }
988
989 void *scan_thread()
990 {
991 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
992         char logbuf[256];
993 #endif
994         int i;
995         struct Flow *flow, **flowpp;
996         struct Time now;
997         struct timespec timeout;
998
999         setuser();
1000
1001         timeout.tv_nsec = 0;
1002         pthread_mutex_lock(&scan_mutex);
1003
1004         for (;;) {
1005                 gettime(&now);
1006                 timeout.tv_sec = now.sec + scan_interval;
1007                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
1008
1009                 gettime(&now);
1010 #if ((DEBUG) & DEBUG_S)
1011                 my_log(LOG_DEBUG, "S: %d", now.sec);
1012 #endif
1013                 for (i = 0; i < 1 << HASH_BITS ; i++) {
1014                         pthread_mutex_lock(&flows_mutex[i]);
1015                         flow = flows[i];
1016                         flowpp = &flows[i];
1017                         while (flow) {
1018                                 if (flow->flags & FLOW_FRAG) {
1019                                         /* Process fragmented flow */
1020                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
1021                                                 /* Fragmented flow expired - put it into special chain */
1022 #if ((DEBUG) & DEBUG_I)
1023                                                 flows_fragmented--;
1024                                                 flows_total--;
1025 #endif
1026                                                 *flowpp = flow->next;
1027                                                 flow->id = 0;
1028                                                 flow->flags &= ~FLOW_FRAG;
1029                                                 flow->next = scan_frag_dreg;
1030                                                 scan_frag_dreg = flow;
1031                                                 flow = *flowpp;
1032                                                 continue;
1033                                         }
1034                                 } else {
1035                                         /* Flow is not frgamented */
1036                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1037                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1038                                                 /* Flow expired */
1039 #if ((DEBUG) & DEBUG_S)
1040                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1041 #endif
1042 #if ((DEBUG) & DEBUG_I)
1043                                                 flows_total--;
1044 #endif
1045                                                 *flowpp = flow->next;
1046                                                 pthread_mutex_lock(&emit_mutex);
1047                                                 flow->next = flows_emit;
1048                                                 flows_emit = flow;
1049 #if ((DEBUG) & DEBUG_I)
1050                                                 emit_queue++;
1051 #endif                          
1052                                                 pthread_mutex_unlock(&emit_mutex);
1053                                                 flow = *flowpp;
1054                                                 continue;
1055                                         }
1056                                 }
1057                                 flowpp = &flow->next;
1058                                 flow = flow->next;
1059                         } /* chain loop */
1060                         pthread_mutex_unlock(&flows_mutex[i]);
1061                 } /* hash loop */
1062                 if (flows_emit) pthread_cond_signal(&emit_cond);
1063
1064                 while (scan_frag_dreg) {
1065                         flow = scan_frag_dreg;
1066                         scan_frag_dreg = flow->next;
1067 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1068                         *logbuf = 0;
1069 #endif
1070                         put_into(flow, MOVE_INTO
1071 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1072                                         , logbuf
1073 #endif
1074                                 );
1075 #if ((DEBUG) & DEBUG_S)
1076                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1077 #endif
1078                 }
1079         }
1080 }
1081
1082 void *cap_thread()
1083 {
1084         struct ulog_packet_msg *ulog_msg;
1085         struct ip *nl;
1086         void *tl;
1087         struct Flow *flow;
1088         int len, off_frag, psize;
1089 #if ((DEBUG) & DEBUG_C)
1090         char buf[64];
1091         char logbuf[256];
1092 #endif
1093         int challenge;
1094
1095         setuser();
1096
1097         while (!killed) {
1098                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1099                 if (len <= 0) {
1100                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1101                         continue;
1102                 }
1103                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1104
1105 #if ((DEBUG) & DEBUG_C)
1106                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1107 #endif
1108
1109                         nl = (void *) &ulog_msg->payload;
1110                         psize = ulog_msg->data_len;
1111
1112                         /* Sanity check */
1113                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1114 #if ((DEBUG) & DEBUG_C)
1115                                 strcat(logbuf, " U");
1116                                 my_log(LOG_DEBUG, "%s", logbuf);
1117 #endif
1118 #if ((DEBUG) & DEBUG_I)
1119                                 pkts_ignored++;
1120 #endif
1121                                 continue;
1122                         }
1123
1124                         if (pending_head->flags) {
1125 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1126                                 my_log(LOG_ERR,
1127 # if ((DEBUG) & DEBUG_C)
1128                                                 "%s %s %s", logbuf,
1129 # else
1130                                                 "%s %s",
1131 # endif
1132                                                 "pending queue full:", "packet lost");
1133 #endif
1134 #if ((DEBUG) & DEBUG_I)
1135                                 pkts_lost_capture++;
1136 #endif
1137                                 goto done;
1138                         }
1139
1140 #if ((DEBUG) & DEBUG_I)
1141                         pkts_total++;
1142 #endif
1143
1144                         flow = pending_head;
1145
1146                         /* ?FIXME? Add sanity check for ip_len? */
1147                         flow->size = ntohs(nl->ip_len);
1148 #if ((DEBUG) & DEBUG_I)
1149                         size_total += flow->size;
1150 #endif
1151
1152                         flow->sip = nl->ip_src;
1153                         flow->dip = nl->ip_dst;
1154                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1155                         
1156                         /* It's going to be expensive calling this syscall on every flow.
1157                          * We should keep a local hash table, for now just bear the overhead... - Sapan*/
1158
1159                         flow->slice_id=0;
1160
1161                         if (ulog_msg->mark > 0) {
1162                                 flow->slice_id = xid_to_slice_id(ulog_msg->mark);
1163                         }
1164
1165                         if (flow->slice_id < 1)
1166                                 flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
1167
1168
1169                         if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
1170                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
1171                         }
1172                         flow->iif = snmp_index(ulog_msg->indev_name);
1173                         flow->oif = snmp_index(ulog_msg->outdev_name);
1174                         flow->proto = nl->ip_p;
1175                         flow->id = 0;
1176                         flow->tcp_flags = 0;
1177                         flow->pkts = 1;
1178                         flow->sizeF = 0;
1179                         flow->sizeP = 0;
1180                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1181                         if (ulog_msg->timestamp_sec) {
1182                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1183                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1184                         } else gettime(&flow->ctime);
1185                         flow->mtime = flow->ctime;
1186
1187                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1188
1189                         /*
1190                            Offset (from network layer) to transport layer header/IP data
1191                            IOW IP header size ;-)
1192
1193                            ?FIXME?
1194                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1195                            */
1196                         off_tl = nl->ip_hl << 2;
1197                         tl = (void *) nl + off_tl;
1198
1199                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1200                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1201                         psize -= off_tl;
1202                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1203                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1204
1205                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1206                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1207 #if ((DEBUG) & DEBUG_C)
1208                                 strcat(logbuf, " F");
1209 #endif
1210 #if ((DEBUG) & DEBUG_I)
1211                                 pkts_total_fragmented++;
1212 #endif
1213                                 flow->flags |= FLOW_FRAG;
1214                                 flow->id = nl->ip_id;
1215
1216                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1217                                         /* Packet whith IP_MF contains information about whole datagram size */
1218                                         flow->flags |= FLOW_LASTFRAG;
1219                                         /* size = frag_offset*8 + data_size */
1220                                         flow->sizeP = off_frag + flow->sizeF;
1221                                 }
1222                         }
1223
1224 #if ((DEBUG) & DEBUG_C)
1225                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1226                         strcat(logbuf, buf);
1227                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1228                         strcat(logbuf, buf);
1229 #endif
1230
1231                         /*
1232                            Fortunately most interesting transport layer information fit
1233                            into first 8 bytes of IP data field (minimal nonzero size).
1234                            Thus we don't need actual packet reassembling to build whole
1235                            transport layer data. We only check the fragment offset for
1236                            zero value to find packet with this information.
1237                            */
1238                         if (!off_frag && psize >= 8) {
1239                                 switch (flow->proto) {
1240                                         case IPPROTO_TCP:
1241                                         case IPPROTO_UDP:
1242                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1243                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1244                                                 goto tl_known;
1245
1246 #ifdef ICMP_TRICK
1247                                         case IPPROTO_ICMP:
1248                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1249                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1250                                                 goto tl_known;
1251 #endif
1252 #ifdef ICMP_TRICK_CISCO
1253                                         case IPPROTO_ICMP:
1254                                                 flow->dp = *((int32_t *) tl);
1255                                                 goto tl_known;
1256 #endif
1257
1258                                         default:
1259                                                 /* Unknown transport layer */
1260 #if ((DEBUG) & DEBUG_C)
1261                                                 strcat(logbuf, " U");
1262 #endif
1263                                                 flow->sp = 0;
1264                                                 flow->dp = 0;
1265                                                 break;
1266
1267 tl_known:
1268 #if ((DEBUG) & DEBUG_C)
1269                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1270                                                 strcat(logbuf, buf);
1271 #endif
1272                                                 flow->flags |= FLOW_TL;
1273                                 }
1274                         }
1275
1276                         /* Check for tcp flags presence (including CWR and ECE). */
1277                         if (flow->proto == IPPROTO_TCP
1278                                         && off_frag < 16
1279                                         && psize >= 16 - off_frag) {
1280                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1281 #if ((DEBUG) & DEBUG_C)
1282                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1283                                 strcat(logbuf, buf);
1284 #endif
1285                         }
1286
1287 #if ((DEBUG) & DEBUG_C)
1288                         sprintf(buf, " => %x", (unsigned) flow);
1289                         strcat(logbuf, buf);
1290                         my_log(LOG_DEBUG, "%s", logbuf);
1291 #endif
1292
1293 #if ((DEBUG) & DEBUG_I)
1294                         pkts_pending++;
1295                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1296                         if (pending_queue_trace < pending_queue_trace_candidate)
1297                                 pending_queue_trace = pending_queue_trace_candidate;
1298 #endif
1299
1300                         /* Flow complete - inform unpending_thread() about it */
1301                         pending_head->flags |= FLOW_PENDING;
1302                         pending_head = pending_head->next;
1303 done:
1304                         pthread_cond_signal(&unpending_cond);
1305                 }
1306         }
1307         return 0;
1308 }
1309
1310 /* Copied out of CoDemux */
1311
1312 static int init_daemon() {
1313         pid_t pid;
1314         FILE *pidfile;
1315
1316         pidfile = fopen(PIDFILE, "w");
1317         if (pidfile == NULL) {
1318                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1319         }
1320
1321         if ((pid = fork()) < 0) {
1322                 fclose(pidfile);
1323                 my_log(LOG_ERR, "Could not fork!\n");
1324                 return(-1);
1325         }
1326         else if (pid != 0) {
1327                 /* i'm the parent, writing down the child pid  */
1328                 fprintf(pidfile, "%u\n", pid);
1329                 fclose(pidfile);
1330                 exit(0);
1331         }
1332
1333         /* close the pid file */
1334         fclose(pidfile);
1335
1336         /* routines for any daemon process
1337            1. create a new session 
1338            2. change directory to the root
1339            3. change the file creation permission 
1340            */
1341         setsid();
1342         chdir("/var/local/fprobe");
1343         umask(0);
1344
1345         return(0);
1346 }
1347
1348 int main(int argc, char **argv)
1349 {
1350         char errpbuf[512];
1351         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1352         int c, i, write_fd, memory_limit = 0;
1353         struct addrinfo hints, *res;
1354         struct sockaddr_in saddr;
1355         pthread_attr_t tattr;
1356         struct sigaction sigact;
1357         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1358         struct timeval timeout;
1359
1360         sched_min = sched_get_priority_min(SCHED);
1361         sched_max = sched_get_priority_max(SCHED);
1362
1363         memset(&saddr, 0 , sizeof(saddr));
1364         memset(&hints, 0 , sizeof(hints));
1365         hints.ai_flags = AI_PASSIVE;
1366         hints.ai_family = AF_INET;
1367         hints.ai_socktype = SOCK_DGRAM;
1368
1369         /* Process command line options */
1370
1371         opterr = 0;
1372         while ((c = my_getopt(argc, argv, parms)) != -1) {
1373                 switch (c) {
1374                         case '?':
1375                                 usage();
1376
1377                         case 'h':
1378                                 usage();
1379                 }
1380         }
1381
1382         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1383         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1384         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1385         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1386         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1387         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1388         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1389         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1390         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1391         if (parms[nflag].count) {
1392                 switch (atoi(parms[nflag].arg)) {
1393                         case 1:
1394                                 netflow = &NetFlow1;
1395                                 break;
1396
1397                         case 5:
1398                                 break;
1399
1400                         case 7:
1401                                 netflow = &NetFlow7;
1402                                 break;
1403
1404                         default:
1405                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1406                                 exit(1);
1407                 }
1408         }
1409         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1410         if (parms[lflag].count) {
1411                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1412                         *log_suffix++ = 0;
1413                         if (*log_suffix) {
1414                                 sprintf(errpbuf, "[%s]", log_suffix);
1415                                 strcat(ident, errpbuf);
1416                         }
1417                 }
1418                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1419                 if (log_suffix) *--log_suffix = ':';
1420         }
1421         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1422 err_malloc:
1423                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1424                 exit(1);
1425         }
1426         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1427         if (parms[qflag].count) {
1428                 pending_queue_length = atoi(parms[qflag].arg);
1429                 if (pending_queue_length < 1) {
1430                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1431                         exit(1);
1432                 }
1433         }
1434         if (parms[rflag].count) {
1435                 schedp.sched_priority = atoi(parms[rflag].arg);
1436                 if (schedp.sched_priority
1437                                 && (schedp.sched_priority < sched_min
1438                                         || schedp.sched_priority > sched_max)) {
1439                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1440                         exit(1);
1441                 }
1442         }
1443         if (parms[Bflag].count) {
1444                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1445         }
1446         if (parms[bflag].count) {
1447                 bulk_quantity = atoi(parms[bflag].arg);
1448                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1449                         fprintf(stderr, "Illegal %s\n", "bulk size");
1450                         exit(1);
1451                 }
1452         }
1453         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1454         if (parms[Xflag].count) {
1455                 for(i = 0; parms[Xflag].arg[i]; i++)
1456                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1457                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1458                         goto err_malloc;
1459                 rule = strtok(parms[Xflag].arg, ":");
1460                 for (i = 0; rule; i++) {
1461                         snmp_rules[i].len = strlen(rule);
1462                         if (snmp_rules[i].len > IFNAMSIZ) {
1463                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1464                                 exit(1);
1465                         }
1466                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1467                         if (!*(rule - 1)) *(rule - 1) = ',';
1468                         rule = strtok(NULL, ",");
1469                         if (!rule) {
1470                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1471                                 exit(1);
1472                         }
1473                         snmp_rules[i].base = atoi(rule);
1474                         *(rule - 1) = ':';
1475                         rule = strtok(NULL, ":");
1476                 }
1477                 nsnmp_rules = i;
1478         }
1479         if (parms[tflag].count)
1480                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1481         if (parms[aflag].count) {
1482                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1483 bad_lhost:
1484                         fprintf(stderr, "Illegal %s\n", "source address");
1485                         exit(1);
1486                 } else {
1487                         saddr = *((struct sockaddr_in *) res->ai_addr);
1488                         freeaddrinfo(res);
1489                 }
1490         }
1491         if (parms[uflag].count) 
1492                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1493                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1494                         exit(1);
1495                 }
1496
1497
1498         /* Process collectors parameters. Brrrr... :-[ */
1499
1500         npeers = argc - optind;
1501         if (npeers > 1) {
1502                 /* Send to remote Netflow collector */
1503                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1504                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1505                         dhost = argv[i];
1506                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1507                         *dport++ = 0;
1508                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1509                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1510                                 exit(1);
1511                         }
1512                         peers[npeers].write_fd = write_fd;
1513                         peers[npeers].type = PEER_MIRROR;
1514                         peers[npeers].laddr = saddr;
1515                         peers[npeers].seq = 0;
1516                         if ((lhost = strchr(dport, '/'))) {
1517                                 *lhost++ = 0;
1518                                 if ((type = strchr(lhost, '/'))) {
1519                                         *type++ = 0;
1520                                         switch (*type) {
1521                                                 case 0:
1522                                                 case 'm':
1523                                                         break;
1524
1525                                                 case 'r':
1526                                                         peers[npeers].type = PEER_ROTATE;
1527                                                         npeers_rot++;
1528                                                         break;
1529
1530                                                 default:
1531                                                         goto bad_collector;
1532                                         }
1533                                 }
1534                                 if (*lhost) {
1535                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1536                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1537                                         freeaddrinfo(res);
1538                                 }
1539                         }
1540                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1541                                                 sizeof(struct sockaddr_in))) {
1542                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1543                                 exit(1);
1544                         }
1545                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1546 bad_collector:
1547                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1548                                 exit(1);
1549                         }
1550                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1551                         freeaddrinfo(res);
1552                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1553                                                 sizeof(struct sockaddr_in))) {
1554                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1555                                 exit(1);
1556                         }
1557
1558                         /* Restore command line */
1559                         if (type) *--type = '/';
1560                         if (lhost) *--lhost = '/';
1561                         *--dport = ':';
1562                 }
1563         }
1564         else if (parms[fflag].count) {
1565                 // log into a file
1566                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1567                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1568                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1569
1570                 peers[npeers].write_fd = START_DATA_FD;
1571                 peers[npeers].type = PEER_FILE;
1572                 peers[npeers].seq = 0;
1573
1574                 read_cur_epoch();
1575                 npeers++;
1576         }
1577         else 
1578                 usage();
1579
1580
1581         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1582         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1583         if (!ulog_handle) {
1584                 fprintf(stderr, "libipulog initialization error: %s",
1585                                 ipulog_strerror(ipulog_errno));
1586                 exit(1);
1587         }
1588         if (sockbufsize)
1589                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1590                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1591                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1592
1593         /* Daemonize (if log destination stdout-free) */
1594
1595         my_log_open(ident, verbosity, log_dest);
1596
1597         init_daemon();
1598
1599         if (!(log_dest & 2)) {
1600                 /* Crash-proofing - Sapan*/
1601                 while (1) {
1602                         int pid=fork();
1603                         if (pid==-1) {
1604                                 fprintf(stderr, "fork(): %s", strerror(errno));
1605                                 exit(1);
1606                         }
1607                         else if (pid==0) {
1608                                 setsid();
1609                                 freopen("/dev/null", "r", stdin);
1610                                 freopen("/dev/null", "w", stdout);
1611                                 freopen("/dev/null", "w", stderr);
1612                                 break;
1613                         }
1614                         else {
1615                                 while (wait3(NULL,0,NULL) < 1);
1616                         }
1617                 }
1618         } else {
1619                 setvbuf(stdout, (char *)0, _IONBF, 0);
1620                 setvbuf(stderr, (char *)0, _IONBF, 0);
1621         }
1622
1623         pid = getpid();
1624         sprintf(errpbuf, "[%ld]", (long) pid);
1625         strcat(ident, errpbuf);
1626
1627         /* Initialization */
1628
1629     init_slice_id_hash();
1630         hash_init(); /* Actually for crc16 only */
1631         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1632         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1633
1634 #ifdef UPTIME_TRICK
1635         /* Hope 12 days is enough :-/ */
1636         start_time_offset = 1 << 20;
1637
1638         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1639 #endif
1640         gettime(&start_time);
1641
1642         /*
1643            Build static pending queue as circular buffer.
1644            */
1645         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1646         pending_tail = pending_head;
1647         for (i = pending_queue_length - 1; i--;) {
1648                 if (!(pending_tail->next = mem_alloc())) {
1649 err_mem_alloc:
1650                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1651                         exit(1);
1652                 }
1653                 pending_tail = pending_tail->next;
1654         }
1655         pending_tail->next = pending_head;
1656         pending_tail = pending_head;
1657
1658         sigemptyset(&sig_mask);
1659         sigact.sa_handler = &sighandler;
1660         sigact.sa_mask = sig_mask;
1661         sigact.sa_flags = 0;
1662         sigaddset(&sig_mask, SIGTERM);
1663         sigaction(SIGTERM, &sigact, 0);
1664 #if ((DEBUG) & DEBUG_I)
1665         sigaddset(&sig_mask, SIGUSR1);
1666         sigaction(SIGUSR1, &sigact, 0);
1667 #endif
1668         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1669                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1670                 exit(1);
1671         }
1672
1673         my_log(LOG_INFO, "Starting %s...", VERSION);
1674
1675         if (parms[cflag].count) {
1676                 if (chdir(parms[cflag].arg) || chroot(".")) {
1677                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1678                         exit(1);
1679                 }
1680         }
1681
1682         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1683         pthread_attr_init(&tattr);
1684         for (i = 0; i < THREADS - 1; i++) {
1685                 if (schedp.sched_priority > 0) {
1686                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1687                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1688                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1689                                 exit(1);
1690                         }
1691                 }
1692                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1693                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1694                         exit(1);
1695                 }
1696                 pthread_detach(thid);
1697                 schedp.sched_priority++;
1698         }
1699
1700         if (pw) {
1701                 if (setgroups(0, NULL)) {
1702                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1703                         exit(1);
1704                 }
1705                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1706                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1707                         exit(1);
1708                 }
1709                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1710                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1711                         exit(1);
1712                 }
1713         }
1714
1715         if (!(pidfile = fopen(pidfilepath, "w")))
1716                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1717         else {
1718                 fprintf(pidfile, "%ld\n", (long) pid);
1719                 fclose(pidfile);
1720         }
1721
1722         my_log(LOG_INFO, "pid: %d", pid);
1723         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1724                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1725                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1726                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1727                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1728                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1729                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1730         for (i = 0; i < nsnmp_rules; i++) {
1731                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1732                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1733         }
1734         for (i = 0; i < npeers; i++) {
1735                 switch (peers[i].type) {
1736                         case PEER_MIRROR:
1737                                 c = 'm';
1738                                 break;
1739                         case PEER_ROTATE:
1740                                 c = 'r';
1741                                 break;
1742                 }
1743                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1744                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1745                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1746         }
1747
1748         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1749
1750         timeout.tv_usec = 0;
1751         while (!killed
1752                         || (total_elements - free_elements - pending_queue_length)
1753                         || emit_count
1754                         || pending_tail->flags) {
1755
1756                 if (!sigs) {
1757                         timeout.tv_sec = scan_interval;
1758                         select(0, 0, 0, 0, &timeout);
1759                 }
1760
1761                 if (sigs & SIGTERM_MASK && !killed) {
1762                         sigs &= ~SIGTERM_MASK;
1763                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1764                         scan_interval = 1;
1765                         frag_lifetime = -1;
1766                         active_lifetime = -1;
1767                         inactive_lifetime = -1;
1768                         emit_timeout = 1;
1769                         unpending_timeout = 1;
1770                         killed = 1;
1771                         pthread_cond_signal(&scan_cond);
1772                         pthread_cond_signal(&unpending_cond);
1773                 }
1774
1775 #if ((DEBUG) & DEBUG_I)
1776                 if (sigs & SIGUSR1_MASK) {
1777                         sigs &= ~SIGUSR1_MASK;
1778                         info_debug();
1779                 }
1780 #endif
1781         }
1782         remove(pidfilepath);
1783 #if ((DEBUG) & DEBUG_I)
1784         info_debug();
1785 #endif
1786         my_log(LOG_INFO, "Done.");
1787 #ifdef WALL
1788         return 0;
1789 #endif
1790 }