Bug fix.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
41         int fd;
42         u_int8_t blocking;
43         struct sockaddr_nl local;
44         struct sockaddr_nl peer;
45         struct nlmsghdr* last_nlhdr;
46 };
47
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
58 #include <net/if.h>
59
60 #include <sys/param.h>
61 #include <pwd.h>
62 #ifdef OS_LINUX
63 #include <grp.h>
64 #endif
65
66 /* pthread_*() */
67 #include <pthread.h>
68
69 /* errno */
70 #include <errno.h>
71
72 /* getaddrinfo() */
73 #include <netdb.h>
74
75 /* nanosleep() */
76 #include <time.h>
77
78 /* gettimeofday() */
79 #include <sys/time.h>
80
81 /* scheduling */
82 #include <sched.h>
83
84 /* select() (POSIX)*/
85 #include <sys/select.h>
86
87 /* open() */
88 #include <sys/stat.h>
89 #include <fcntl.h>
90
91 #include <fprobe-ulog.h>
92 #include <my_log.h>
93 #include <my_getopt.h>
94 #include <netflow.h>
95 #include <hash.h>
96 #include <mem.h>
97
98 #define PIDFILE "/var/log/fprobe-ulog.pid"
99 #define STD_NETFLOW_PDU
100
101 enum {
102         aflag,
103         Bflag,
104         bflag,
105         cflag,
106         dflag,
107         Dflag,
108         eflag,
109         Eflag,
110         fflag,
111         gflag,
112         hflag,
113         lflag,
114         mflag,
115         Mflag,
116         nflag,
117         qflag,
118         rflag,
119         sflag,
120         tflag,
121         Tflag,
122         Uflag,
123         uflag,
124         vflag,
125         Wflag,
126         Xflag,
127 };
128
129 static struct getopt_parms parms[] = {
130         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'h', 0, 0, 0},
141         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'M', 0, 0, 0},
144         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
148         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
154         {0, 0, 0, 0}
155 };
156
157 extern char *optarg;
158 extern int optind, opterr, optopt;
159 extern int errno;
160
161 extern struct NetFlow NetFlow1;
162 extern struct NetFlow NetFlow5;
163 extern struct NetFlow NetFlow7;
164
165 #define START_VALUE -5
166 #define mark_is_tos parms[Mflag].count
167 static unsigned scan_interval = 5;
168 static int min_free = 0;
169 static int frag_lifetime = 30;
170 static int inactive_lifetime = 60;
171 static int active_lifetime = 300;
172 static int sockbufsize;
173 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
174 #if (MEM_BITS == 0) || (MEM_BITS == 16)
175 #define BULK_QUANTITY 10000
176 #else
177 #define BULK_QUANTITY 200
178 #endif
179
180 static unsigned epoch_length=60, log_epochs=1; 
181 static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
182
183 static unsigned bulk_quantity = BULK_QUANTITY;
184 static unsigned pending_queue_length = 100;
185 static struct NetFlow *netflow = &NetFlow5;
186 static unsigned verbosity = 6;
187 static unsigned log_dest = MY_LOG_SYSLOG;
188 static struct Time start_time;
189 static long start_time_offset;
190 static int off_tl;
191 /* From mem.c */
192 extern unsigned total_elements;
193 extern unsigned free_elements;
194 extern unsigned total_memory;
195 #if ((DEBUG) & DEBUG_I)
196 static unsigned emit_pkts, emit_queue;
197 static uint64_t size_total;
198 static unsigned pkts_total, pkts_total_fragmented;
199 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
200 static unsigned pkts_pending, pkts_pending_done;
201 static unsigned pending_queue_trace, pending_queue_trace_candidate;
202 static unsigned flows_total, flows_fragmented;
203 #endif
204 static unsigned emit_count;
205 static uint32_t emit_sequence;
206 static unsigned emit_rate_bytes, emit_rate_delay;
207 static struct Time emit_time;
208 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
209 static pthread_t thid;
210 static sigset_t sig_mask;
211 static struct sched_param schedp;
212 static int sched_min, sched_max;
213 static int npeers, npeers_rot;
214 static struct peer *peers;
215 static int sigs;
216
217 static struct Flow *flows[1 << HASH_BITS];
218 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
219
220 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
221 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
222
223 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
224 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
225 static struct Flow *pending_head, *pending_tail;
226 static struct Flow *scan_frag_dreg;
227
228 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
230 static struct Flow *flows_emit;
231
232 static char ident[256] = "fprobe-ulog";
233 static FILE *pidfile;
234 static char *pidfilepath;
235 static pid_t pid;
236 static int killed;
237 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
238 static struct ipulog_handle *ulog_handle;
239 static uint32_t ulog_gmask = 1;
240 static char *cap_buf;
241 static int nsnmp_rules;
242 static struct snmp_rule *snmp_rules;
243 static struct passwd *pw = 0;
244
245 void usage()
246 {
247         fprintf(stdout,
248                         "fprobe-ulog: a NetFlow probe. Version %s\n"
249                         "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
250                         "\n"
251                         "-h\t\tDisplay this help\n"
252                         "-U <mask>\tULOG group bitwise mask [1]\n"
253                         "-s <seconds>\tHow often scan for expired flows [5]\n"
254                         "-g <seconds>\tFragmented flow lifetime [30]\n"
255                         "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
256                         "-f <filename>\tLog flow data in a file\n"
257                         "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
258                         "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
259                         "-a <address>\tUse <address> as source for NetFlow flow\n"
260                         "-X <rules>\tInterface name to SNMP-index conversion rules\n"
261                         "-M\t\tUse netfilter mark value as ToS flag\n"
262                         "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
263                         "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
264                         "-q <flows>\tPending queue length [100]\n"
265                         "-B <kilobytes>\tKernel capture buffer size [0]\n"
266                         "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
267                         "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
268                         "-c <directory>\tDirectory to chroot to\n"
269                         "-u <user>\tUser to run as\n"
270                         "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
271                         "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
272                         "-y <remote:port>\tAddress of the NetFlow collector\n"
273                         "-f <writable file>\tFile to write data into\n"
274                         "-T <n>\tRotate log file every n epochs\n"
275                         "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
276                         "-E <[1..60]>\tSize of an epoch in minutes\n"
277                         "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
278                         ,
279                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
280         exit(0);
281 }
282
283 #if ((DEBUG) & DEBUG_I)
284 void info_debug()
285 {
286         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
287                         pkts_total, pkts_total_fragmented, size_total,
288                         pkts_pending - pkts_pending_done, pending_queue_trace);
289         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
290                         pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
291         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
292                         flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
293         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
294                         total_elements, free_elements, total_memory);
295 }
296 #endif
297
298 void sighandler(int sig)
299 {
300         switch (sig) {
301                 case SIGTERM:
302                         sigs |= SIGTERM_MASK;
303                         break;
304 #if ((DEBUG) & DEBUG_I)
305                 case SIGUSR1:
306                         sigs |= SIGUSR1_MASK;
307                         break;
308 #endif
309         }
310 }
311
312 void gettime(struct Time *now)
313 {
314         struct timeval t;
315
316         gettimeofday(&t, 0);
317         now->sec = t.tv_sec;
318         now->usec = t.tv_usec;
319 }
320
321
322 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
323 {
324         return (t1->sec - t2->sec)/60;
325 }
326
327 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
328 {
329         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
330 }
331
332 /* Uptime in miliseconds */
333 uint32_t getuptime(struct Time *t)
334 {
335         /* Maximum uptime is about 49/2 days */
336         return cmpmtime(t, &start_time);
337 }
338
339 /* Uptime in minutes */
340 uint32_t getuptime_minutes(struct Time *t)
341 {
342         /* Maximum uptime is about 49/2 days */
343         return cmpMtime(t, &start_time);
344 }
345
346 hash_t hash_flow(struct Flow *flow)
347 {
348         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
349         else return hash(flow, sizeof(struct Flow_TL));
350 }
351
352 uint16_t snmp_index(char *name) {
353         uint32_t i;
354
355         if (!*name) return 0;
356
357         for (i = 0; (int) i < nsnmp_rules; i++) {
358                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
359                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
360         }
361
362         if ((i = if_nametoindex(name))) return i;
363
364         return -1;
365 }
366
367 inline void copy_flow(struct Flow *src, struct Flow *dst)
368 {
369         dst->iif = src->iif;
370         dst->oif = src->oif;
371         dst->sip = src->sip;
372         dst->dip = src->dip;
373         dst->tos = src->tos;
374         dst->proto = src->proto;
375         dst->tcp_flags = src->tcp_flags;
376         dst->id = src->id;
377         dst->sp = src->sp;
378         dst->dp = src->dp;
379         dst->pkts = src->pkts;
380         dst->size = src->size;
381         dst->sizeF = src->sizeF;
382         dst->sizeP = src->sizeP;
383         dst->ctime = src->ctime;
384         dst->mtime = src->mtime;
385         dst->flags = src->flags;
386 }
387
388 void get_cur_epoch() {
389         int fd;
390         fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
391         if (fd != -1) {
392                 char snum[7];
393                 ssize_t len;
394                 len = read(fd, snum, sizeof(snum)-1);
395                 if (len != -1) {
396                         snum[len]='\0';
397                         sscanf(snum,"%d",&cur_epoch);
398                         cur_epoch++; /* Let's not stone the last epoch */
399                         close(fd);
400                 }
401         }
402         return;
403 }
404
405
406 void update_cur_epoch_file(int n) {
407         int fd, len;
408         char snum[7];
409         len=snprintf(snum,6,"%d",n);
410         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
411         if (fd == -1) {
412                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
413                 return;
414         }
415         write(fd, snum, len);
416         close(fd);
417 }
418
419 unsigned get_log_fd(char *fname, int cur_fd) {
420         struct Time now;
421         unsigned cur_uptime;
422         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
423          * doesn't solve the problem */
424
425         struct statfs statfs;
426         int ret_fd;
427         gettime(&now);
428         cur_uptime = getuptime_minutes(&now);
429
430
431         if (cur_fd!=START_VALUE) {
432                 if (fstatfs(cur_fd, &statfs) == -1) {
433                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
434                 }
435                 else {
436                         if (min_free && (statfs.f_bavail < min_free)
437                                         && (cur_epoch==last_peak))
438                         {
439                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
440                                 cur_epoch = -1;
441                         }
442                         /*
443                            else 
444                            assume that we can reclaim space by overwriting our own files
445                            and that the difference in size will not fill the disk - sapan
446                            */
447                 }
448         }
449
450         /* Epoch length in minutes */
451         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
452                 char nextname[MAX_PATH_LEN];
453                 int write_fd;
454                 prev_uptime = cur_uptime;
455                 cur_epoch = (cur_epoch + 1) % log_epochs;
456                 last_peak = cur_epoch;
457                 if (cur_fd>0)
458                         close(cur_fd);
459                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
460                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
461                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
462                         exit(1);
463                 }
464                 update_cur_epoch_file(cur_epoch);
465                 ret_fd = write_fd;
466         }
467         else {
468                 ret_fd = cur_fd;
469         }
470         return(ret_fd);
471 }
472
473 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
474 {
475         struct Flow **flowpp;
476
477 #ifdef WALL
478         flowpp = 0;
479 #endif
480
481         if (prev) flowpp = *prev;
482
483         while (where) {
484                 if (where->sip.s_addr == what->sip.s_addr
485                                 && where->dip.s_addr == what->dip.s_addr
486                                 && where->proto == what->proto) {
487                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
488                                 case 0:
489                                         /* Both unfragmented */
490                                         if ((what->sp == where->sp)
491                                                         && (what->dp == where->dp)) goto done;
492                                         break;
493                                 case 2:
494                                         /* Both fragmented */
495                                         if (where->id == what->id) goto done;
496                                         break;
497                         }
498                 }
499                 flowpp = &where->next;
500                 where = where->next;
501         }
502 done:
503         if (prev) *prev = flowpp;
504         return where;
505 }
506
507         int put_into(struct Flow *flow, int flag
508 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
509                         , char *logbuf
510 #endif
511                     )
512 {
513         int ret = 0;
514         hash_t h;
515         struct Flow *flown, **flowpp;
516 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
517         char buf[64];
518 #endif
519
520         h = hash_flow(flow);
521 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
522         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
523         strcat(logbuf, buf);
524 #endif
525         pthread_mutex_lock(&flows_mutex[h]);
526         flowpp = &flows[h];
527         if (!(flown = find(flows[h], flow, &flowpp))) {
528                 /* No suitable flow found - add */
529                 if (flag == COPY_INTO) {
530                         if ((flown = mem_alloc())) {
531                                 copy_flow(flow, flown);
532                                 flow = flown;
533                         } else {
534 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
535                                 my_log(LOG_ERR, "%s %s. %s",
536                                                 "mem_alloc():", strerror(errno), "packet lost");
537 #endif
538                                 return -1;
539                         }
540                 }
541                 flow->next = flows[h];
542                 flows[h] = flow;
543 #if ((DEBUG) & DEBUG_I)
544                 flows_total++;
545                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
546 #endif
547 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
548                 if (flown) {
549                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
550                         strcat(logbuf, buf);
551                 }
552 #endif
553         } else {
554                 /* Found suitable flow - update */
555 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
556                 sprintf(buf, " +> %x", (unsigned) flown);
557                 strcat(logbuf, buf);
558 #endif
559                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
560                         flown->mtime = flow->mtime;
561                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
562                         flown->ctime = flow->ctime;
563                 flown->tcp_flags |= flow->tcp_flags;
564                 flown->size += flow->size;
565                 flown->pkts += flow->pkts;
566                 if (flow->flags & FLOW_FRAG) {
567                         /* Fragmented flow require some additional work */
568                         if (flow->flags & FLOW_TL) {
569                                 /*
570                                    ?FIXME?
571                                    Several packets with FLOW_TL (attack)
572                                    */
573                                 flown->sp = flow->sp;
574                                 flown->dp = flow->dp;
575                         }
576                         if (flow->flags & FLOW_LASTFRAG) {
577                                 /*
578                                    ?FIXME?
579                                    Several packets with FLOW_LASTFRAG (attack)
580                                    */
581                                 flown->sizeP = flow->sizeP;
582                         }
583                         flown->flags |= flow->flags;
584                         flown->sizeF += flow->sizeF;
585                         if ((flown->flags & FLOW_LASTFRAG)
586                                         && (flown->sizeF >= flown->sizeP)) {
587                                 /* All fragments received - flow reassembled */
588                                 *flowpp = flown->next;
589                                 pthread_mutex_unlock(&flows_mutex[h]);
590 #if ((DEBUG) & DEBUG_I)
591                                 flows_total--;
592                                 flows_fragmented--;
593 #endif
594                                 flown->id = 0;
595                                 flown->flags &= ~FLOW_FRAG;
596 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
597                                 strcat(logbuf," R");
598 #endif
599                                 ret = put_into(flown, MOVE_INTO
600 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
601                                                 , logbuf
602 #endif
603                                               );
604                         }
605                 }
606                 if (flag == MOVE_INTO) mem_free(flow);
607         }
608         pthread_mutex_unlock(&flows_mutex[h]);
609         return ret;
610 }
611
612 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
613 {
614         int i;
615
616         for (i = 0; i < fields; i++) {
617 #if ((DEBUG) & DEBUG_F)
618                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
619 #endif
620                 switch (format[i]) {
621                         case NETFLOW_IPV4_SRC_ADDR:
622                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
623                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
624                                 break;
625
626                         case NETFLOW_IPV4_DST_ADDR:
627                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
628                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
629                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
630                                 }
631                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
632                                 break;
633
634                         case NETFLOW_INPUT_SNMP:
635                                 *((uint16_t *) p) = htons(flow->iif);
636                                 p += NETFLOW_INPUT_SNMP_SIZE;
637                                 break;
638
639                         case NETFLOW_OUTPUT_SNMP:
640                                 *((uint16_t *) p) = htons(flow->oif);
641                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
642                                 break;
643
644                         case NETFLOW_PKTS_32:
645                                 *((uint32_t *) p) = htonl(flow->pkts);
646                                 p += NETFLOW_PKTS_32_SIZE;
647                                 break;
648
649                         case NETFLOW_BYTES_32:
650                                 *((uint32_t *) p) = htonl(flow->size);
651                                 p += NETFLOW_BYTES_32_SIZE;
652                                 break;
653
654                         case NETFLOW_FIRST_SWITCHED:
655                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
656                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
657                                 break;
658
659                         case NETFLOW_LAST_SWITCHED:
660                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
661                                 p += NETFLOW_LAST_SWITCHED_SIZE;
662                                 break;
663
664                         case NETFLOW_L4_SRC_PORT:
665                                 *((uint16_t *) p) = flow->sp;
666                                 p += NETFLOW_L4_SRC_PORT_SIZE;
667                                 break;
668
669                         case NETFLOW_L4_DST_PORT:
670                                 *((uint16_t *) p) = flow->dp;
671                                 p += NETFLOW_L4_DST_PORT_SIZE;
672                                 break;
673
674                         case NETFLOW_PROT:
675                                 *((uint8_t *) p) = flow->proto;
676                                 p += NETFLOW_PROT_SIZE;
677                                 break;
678
679                         case NETFLOW_SRC_TOS:
680                                 *((uint8_t *) p) = flow->tos;
681                                 p += NETFLOW_SRC_TOS_SIZE;
682                                 break;
683
684                         case NETFLOW_TCP_FLAGS:
685                                 *((uint8_t *) p) = flow->tcp_flags;
686                                 p += NETFLOW_TCP_FLAGS_SIZE;
687                                 break;
688
689                         case NETFLOW_VERSION:
690                                 *((uint16_t *) p) = htons(netflow->Version);
691                                 p += NETFLOW_VERSION_SIZE;
692                                 break;
693
694                         case NETFLOW_COUNT:
695                                 *((uint16_t *) p) = htons(emit_count);
696                                 p += NETFLOW_COUNT_SIZE;
697                                 break;
698
699                         case NETFLOW_UPTIME:
700                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
701                                 p += NETFLOW_UPTIME_SIZE;
702                                 break;
703
704                         case NETFLOW_UNIX_SECS:
705                                 *((uint32_t *) p) = htonl(emit_time.sec);
706                                 p += NETFLOW_UNIX_SECS_SIZE;
707                                 break;
708
709                         case NETFLOW_UNIX_NSECS:
710                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
711                                 p += NETFLOW_UNIX_NSECS_SIZE;
712                                 break;
713
714                         case NETFLOW_FLOW_SEQUENCE:
715                                 //*((uint32_t *) p) = htonl(emit_sequence);
716                                 *((uint32_t *) p) = 0;
717                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
718                                 break;
719
720                         case NETFLOW_PAD8:
721                                 /* Unsupported (uint8_t) */
722                         case NETFLOW_ENGINE_TYPE:
723                         case NETFLOW_ENGINE_ID:
724                         case NETFLOW_FLAGS7_1:
725                         case NETFLOW_SRC_MASK:
726                         case NETFLOW_DST_MASK:
727                                 *((uint8_t *) p) = 0;
728                                 p += NETFLOW_PAD8_SIZE;
729                                 break;
730                         case NETFLOW_XID:
731                                 *((uint16_t *) p) = flow->tos;
732                                 p += NETFLOW_XID_SIZE;
733                                 break;
734                         case NETFLOW_PAD16:
735                                 /* Unsupported (uint16_t) */
736                         case NETFLOW_SRC_AS:
737                         case NETFLOW_DST_AS:
738                         case NETFLOW_FLAGS7_2:
739                                 *((uint16_t *) p) = 0;
740                                 p += NETFLOW_PAD16_SIZE;
741                                 break;
742
743                         case NETFLOW_PAD32:
744                                 /* Unsupported (uint32_t) */
745                         case NETFLOW_IPV4_NEXT_HOP:
746                         case NETFLOW_ROUTER_SC:
747                                 *((uint32_t *) p) = 0;
748                                 p += NETFLOW_PAD32_SIZE;
749                                 break;
750
751                         default:
752                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
753                                                 format, i, format[i]);
754                                 exit(1);
755                 }
756         }
757 #if ((DEBUG) & DEBUG_F)
758         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
759 #endif
760         return p;
761 }
762
763 void setuser() {
764         /*
765            Workaround for clone()-based threads
766            Try to change EUID independently of main thread
767            */
768         if (pw) {
769                 setgroups(0, NULL);
770                 setregid(pw->pw_gid, pw->pw_gid);
771                 setreuid(pw->pw_uid, pw->pw_uid);
772         }
773 }
774
775 void *emit_thread()
776 {
777         struct Flow *flow;
778         void *p;
779         struct timeval now;
780         struct timespec timeout;
781         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
782
783         p = (void *) &emit_packet + netflow->HeaderSize;
784         timeout.tv_nsec = 0;
785
786         setuser();
787
788         for (;;) {
789                 pthread_mutex_lock(&emit_mutex);
790                 while (!flows_emit) {
791                         gettimeofday(&now, 0);
792                         timeout.tv_sec = now.tv_sec + emit_timeout;
793                         /* Do not wait until emit_packet will filled - it may be too long */
794                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
795                                 pthread_mutex_unlock(&emit_mutex);
796                                 goto sendit;
797                         }
798                 }
799                 flow = flows_emit;
800                 flows_emit = flows_emit->next;
801 #if ((DEBUG) & DEBUG_I)
802                 emit_queue--;
803 #endif          
804                 pthread_mutex_unlock(&emit_mutex);
805
806 #ifdef UPTIME_TRICK
807                 if (!emit_count) {
808                         gettime(&start_time);
809                         start_time.sec -= start_time_offset;
810                 }
811 #endif
812                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
813                 mem_free(flow);
814                 emit_count++;
815 #ifdef PF2_DEBUG
816                 printf("Emit count = %d\n", emit_count);
817                 fflush(stdout);
818 #endif
819                 if (emit_count == netflow->MaxFlows) {
820 sendit:
821                         gettime(&emit_time);
822                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
823                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
824                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
825 #ifdef STD_NETFLOW_PDU
826                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
827 #endif
828                         peer_rot_cur = 0;
829                         for (i = 0; i < npeers; i++) {
830                                 if (peers[i].type == PEER_FILE) {
831                                         if (netflow->SeqOffset)
832                                                 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
833                                         peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
834                                         ret = write(peers[i].write_fd, emit_packet, size);
835                                         if (ret < size) {
836
837 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
838                                                 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
839                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
840 #endif
841 #undef MESSAGES
842                                         }
843 #if ((DEBUG) & DEBUG_E)
844                                         commaneelse {
845                                                 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
846                                                                 emit_count, i + 1, peers[i].seq);
847                                         }
848 #endif
849                                         peers[i].seq += emit_count;
850
851                                         /* Rate limit */
852                                         if (emit_rate_bytes) {
853                                                 sent += size;
854                                                 delay = sent / emit_rate_bytes;
855                                                 if (delay) {
856                                                         sent %= emit_rate_bytes;
857                                                         timeout.tv_sec = 0;
858                                                         timeout.tv_nsec = emit_rate_delay * delay;
859                                                         while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
860                                                 }
861                                         }
862                                 }
863                                 else
864                                         if (peers[i].type == PEER_MIRROR) goto sendreal;
865                                         else
866                                                 if (peers[i].type == PEER_ROTATE) 
867                                                         if (peer_rot_cur++ == peer_rot_work) {
868 sendreal:
869                                                                 if (netflow->SeqOffset)
870                                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
871                                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
872                                                                 if (ret < size) {
873 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
874                                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
875                                                                                         i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
876 #endif
877                                                                 }
878 #if ((DEBUG) & DEBUG_E)
879                                                                 commaneelse {
880                                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
881                                                                                         emit_count, i + 1, peers[i].seq);
882                                                                 }
883 #endif
884                                                                 peers[i].seq += emit_count;
885
886                                                                 /* Rate limit */
887                                                                 if (emit_rate_bytes) {
888                                                                         sent += size;
889                                                                         delay = sent / emit_rate_bytes;
890                                                                         if (delay) {
891                                                                                 sent %= emit_rate_bytes;
892                                                                                 timeout.tv_sec = 0;
893                                                                                 timeout.tv_nsec = emit_rate_delay * delay;
894                                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
895                                                                         }
896                                                                 }
897                                                         }
898                         }
899                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
900                         emit_sequence += emit_count;
901                         emit_count = 0;
902 #if ((DEBUG) & DEBUG_I)
903                         emit_pkts++;
904 #endif
905                 }
906         }
907 }       
908
909 void *unpending_thread()
910 {
911         struct timeval now;
912         struct timespec timeout;
913 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
914         char logbuf[256];
915 #endif
916
917         setuser();
918
919         timeout.tv_nsec = 0;
920         pthread_mutex_lock(&unpending_mutex);
921
922         for (;;) {
923                 while (!(pending_tail->flags & FLOW_PENDING)) {
924                         gettimeofday(&now, 0);
925                         timeout.tv_sec = now.tv_sec + unpending_timeout;
926                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
927                 }
928
929 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
930                 *logbuf = 0;
931 #endif
932                 if (put_into(pending_tail, COPY_INTO
933 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
934                                         , logbuf
935 #endif
936                             ) < 0) {
937 #if ((DEBUG) & DEBUG_I)
938                         pkts_lost_unpending++;
939 #endif                          
940                 }
941
942 #if ((DEBUG) & DEBUG_U)
943                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
944 #endif
945
946                 pending_tail->flags = 0;
947                 pending_tail = pending_tail->next;
948 #if ((DEBUG) & DEBUG_I)
949                 pkts_pending_done++;
950 #endif
951         }
952 }
953
954 void *scan_thread()
955 {
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
957         char logbuf[256];
958 #endif
959         int i;
960         struct Flow *flow, **flowpp;
961         struct Time now;
962         struct timespec timeout;
963
964         setuser();
965
966         timeout.tv_nsec = 0;
967         pthread_mutex_lock(&scan_mutex);
968
969         for (;;) {
970                 gettime(&now);
971                 timeout.tv_sec = now.sec + scan_interval;
972                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
973
974                 gettime(&now);
975 #if ((DEBUG) & DEBUG_S)
976                 my_log(LOG_DEBUG, "S: %d", now.sec);
977 #endif
978                 for (i = 0; i < 1 << HASH_BITS ; i++) {
979                         pthread_mutex_lock(&flows_mutex[i]);
980                         flow = flows[i];
981                         flowpp = &flows[i];
982                         while (flow) {
983                                 if (flow->flags & FLOW_FRAG) {
984                                         /* Process fragmented flow */
985                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
986                                                 /* Fragmented flow expired - put it into special chain */
987 #if ((DEBUG) & DEBUG_I)
988                                                 flows_fragmented--;
989                                                 flows_total--;
990 #endif
991                                                 *flowpp = flow->next;
992                                                 flow->id = 0;
993                                                 flow->flags &= ~FLOW_FRAG;
994                                                 flow->next = scan_frag_dreg;
995                                                 scan_frag_dreg = flow;
996                                                 flow = *flowpp;
997                                                 continue;
998                                         }
999                                 } else {
1000                                         /* Flow is not frgamented */
1001                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
1002                                                         || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
1003                                                 /* Flow expired */
1004 #if ((DEBUG) & DEBUG_S)
1005                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1006 #endif
1007 #if ((DEBUG) & DEBUG_I)
1008                                                 flows_total--;
1009 #endif
1010                                                 *flowpp = flow->next;
1011                                                 pthread_mutex_lock(&emit_mutex);
1012                                                 flow->next = flows_emit;
1013                                                 flows_emit = flow;
1014 #if ((DEBUG) & DEBUG_I)
1015                                                 emit_queue++;
1016 #endif                          
1017                                                 pthread_mutex_unlock(&emit_mutex);
1018                                                 flow = *flowpp;
1019                                                 continue;
1020                                         }
1021                                 }
1022                                 flowpp = &flow->next;
1023                                 flow = flow->next;
1024                         } /* chain loop */
1025                         pthread_mutex_unlock(&flows_mutex[i]);
1026                 } /* hash loop */
1027                 if (flows_emit) pthread_cond_signal(&emit_cond);
1028
1029                 while (scan_frag_dreg) {
1030                         flow = scan_frag_dreg;
1031                         scan_frag_dreg = flow->next;
1032 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1033                         *logbuf = 0;
1034 #endif
1035                         put_into(flow, MOVE_INTO
1036 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1037                                         , logbuf
1038 #endif
1039                                 );
1040 #if ((DEBUG) & DEBUG_S)
1041                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1042 #endif
1043                 }
1044         }
1045 }
1046
1047 void *cap_thread()
1048 {
1049         struct ulog_packet_msg *ulog_msg;
1050         struct ip *nl;
1051         void *tl;
1052         struct Flow *flow;
1053         int len, off_frag, psize;
1054 #if ((DEBUG) & DEBUG_C)
1055         char buf[64];
1056         char logbuf[256];
1057 #endif
1058
1059         setuser();
1060
1061         while (!killed) {
1062                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1063                 if (len <= 0) {
1064                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1065                         continue;
1066                 }
1067                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1068
1069 #if ((DEBUG) & DEBUG_C)
1070                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1071 #endif
1072
1073                         nl = (void *) &ulog_msg->payload;
1074                         psize = ulog_msg->data_len;
1075
1076                         /* Sanity check */
1077                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1078 #if ((DEBUG) & DEBUG_C)
1079                                 strcat(logbuf, " U");
1080                                 my_log(LOG_DEBUG, "%s", logbuf);
1081 #endif
1082 #if ((DEBUG) & DEBUG_I)
1083                                 pkts_ignored++;
1084 #endif
1085                                 continue;
1086                         }
1087
1088                         if (pending_head->flags) {
1089 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1090                                 my_log(LOG_ERR,
1091 # if ((DEBUG) & DEBUG_C)
1092                                                 "%s %s %s", logbuf,
1093 # else
1094                                                 "%s %s",
1095 # endif
1096                                                 "pending queue full:", "packet lost");
1097 #endif
1098 #if ((DEBUG) & DEBUG_I)
1099                                 pkts_lost_capture++;
1100 #endif
1101                                 goto done;
1102                         }
1103
1104 #if ((DEBUG) & DEBUG_I)
1105                         pkts_total++;
1106 #endif
1107
1108                         flow = pending_head;
1109
1110                         /* ?FIXME? Add sanity check for ip_len? */
1111                         flow->size = ntohs(nl->ip_len);
1112 #if ((DEBUG) & DEBUG_I)
1113                         size_total += flow->size;
1114 #endif
1115
1116                         flow->sip = nl->ip_src;
1117                         flow->dip = nl->ip_dst;
1118                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1119                         if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
1120                                 my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
1121                         }
1122                         flow->iif = snmp_index(ulog_msg->indev_name);
1123                         flow->oif = snmp_index(ulog_msg->outdev_name);
1124                         flow->proto = nl->ip_p;
1125                         flow->id = 0;
1126                         flow->tcp_flags = 0;
1127                         flow->pkts = 1;
1128                         flow->sizeF = 0;
1129                         flow->sizeP = 0;
1130                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1131                         if (ulog_msg->timestamp_sec) {
1132                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1133                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1134                         } else gettime(&flow->ctime);
1135                         flow->mtime = flow->ctime;
1136
1137                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1138
1139                         /*
1140                            Offset (from network layer) to transport layer header/IP data
1141                            IOW IP header size ;-)
1142
1143                            ?FIXME?
1144                            Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1145                            */
1146                         off_tl = nl->ip_hl << 2;
1147                         tl = (void *) nl + off_tl;
1148
1149                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1150                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1151                         psize -= off_tl;
1152                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1153                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1154
1155                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1156                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1157 #if ((DEBUG) & DEBUG_C)
1158                                 strcat(logbuf, " F");
1159 #endif
1160 #if ((DEBUG) & DEBUG_I)
1161                                 pkts_total_fragmented++;
1162 #endif
1163                                 flow->flags |= FLOW_FRAG;
1164                                 flow->id = nl->ip_id;
1165
1166                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1167                                         /* Packet whith IP_MF contains information about whole datagram size */
1168                                         flow->flags |= FLOW_LASTFRAG;
1169                                         /* size = frag_offset*8 + data_size */
1170                                         flow->sizeP = off_frag + flow->sizeF;
1171                                 }
1172                         }
1173
1174 #if ((DEBUG) & DEBUG_C)
1175                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1176                         strcat(logbuf, buf);
1177                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1178                         strcat(logbuf, buf);
1179 #endif
1180
1181                         /*
1182                            Fortunately most interesting transport layer information fit
1183                            into first 8 bytes of IP data field (minimal nonzero size).
1184                            Thus we don't need actual packet reassembling to build whole
1185                            transport layer data. We only check the fragment offset for
1186                            zero value to find packet with this information.
1187                            */
1188                         if (!off_frag && psize >= 8) {
1189                                 switch (flow->proto) {
1190                                         case IPPROTO_TCP:
1191                                         case IPPROTO_UDP:
1192                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1193                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1194                                                 goto tl_known;
1195
1196 #ifdef ICMP_TRICK
1197                                         case IPPROTO_ICMP:
1198                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1199                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1200                                                 goto tl_known;
1201 #endif
1202 #ifdef ICMP_TRICK_CISCO
1203                                         case IPPROTO_ICMP:
1204                                                 flow->dp = *((int32_t *) tl);
1205                                                 goto tl_known;
1206 #endif
1207
1208                                         default:
1209                                                 /* Unknown transport layer */
1210 #if ((DEBUG) & DEBUG_C)
1211                                                 strcat(logbuf, " U");
1212 #endif
1213                                                 flow->sp = 0;
1214                                                 flow->dp = 0;
1215                                                 break;
1216
1217 tl_known:
1218 #if ((DEBUG) & DEBUG_C)
1219                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1220                                                 strcat(logbuf, buf);
1221 #endif
1222                                                 flow->flags |= FLOW_TL;
1223                                 }
1224                         }
1225
1226                         /* Check for tcp flags presence (including CWR and ECE). */
1227                         if (flow->proto == IPPROTO_TCP
1228                                         && off_frag < 16
1229                                         && psize >= 16 - off_frag) {
1230                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1231 #if ((DEBUG) & DEBUG_C)
1232                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1233                                 strcat(logbuf, buf);
1234 #endif
1235                         }
1236
1237 #if ((DEBUG) & DEBUG_C)
1238                         sprintf(buf, " => %x", (unsigned) flow);
1239                         strcat(logbuf, buf);
1240                         my_log(LOG_DEBUG, "%s", logbuf);
1241 #endif
1242
1243 #if ((DEBUG) & DEBUG_I)
1244                         pkts_pending++;
1245                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1246                         if (pending_queue_trace < pending_queue_trace_candidate)
1247                                 pending_queue_trace = pending_queue_trace_candidate;
1248 #endif
1249
1250                         /* Flow complete - inform unpending_thread() about it */
1251                         pending_head->flags |= FLOW_PENDING;
1252                         pending_head = pending_head->next;
1253 done:
1254                         pthread_cond_signal(&unpending_cond);
1255                 }
1256         }
1257         return 0;
1258 }
1259
1260 /* Copied out of CoDemux */
1261
1262 static int init_daemon() {
1263         pid_t pid;
1264         FILE *pidfile;
1265
1266         pidfile = fopen(PIDFILE, "w");
1267         if (pidfile == NULL) {
1268                 my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1269         }
1270
1271         if ((pid = fork()) < 0) {
1272                 fclose(pidfile);
1273                 my_log(LOG_ERR, "Could not fork!\n");
1274                 return(-1);
1275         }
1276         else if (pid != 0) {
1277                 /* i'm the parent, writing down the child pid  */
1278                 fprintf(pidfile, "%u\n", pid);
1279                 fclose(pidfile);
1280                 exit(0);
1281         }
1282
1283         /* close the pid file */
1284         fclose(pidfile);
1285
1286         /* routines for any daemon process
1287            1. create a new session 
1288            2. change directory to the root
1289            3. change the file creation permission 
1290            */
1291         setsid();
1292         chdir("/var/local/fprobe");
1293         umask(0);
1294
1295         return(0);
1296 }
1297
1298 int main(int argc, char **argv)
1299 {
1300         char errpbuf[512];
1301         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1302         int c, i, write_fd, memory_limit = 0;
1303         struct addrinfo hints, *res;
1304         struct sockaddr_in saddr;
1305         pthread_attr_t tattr;
1306         struct sigaction sigact;
1307         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1308         struct timeval timeout;
1309
1310         sched_min = sched_get_priority_min(SCHED);
1311         sched_max = sched_get_priority_max(SCHED);
1312
1313         memset(&saddr, 0 , sizeof(saddr));
1314         memset(&hints, 0 , sizeof(hints));
1315         hints.ai_flags = AI_PASSIVE;
1316         hints.ai_family = AF_INET;
1317         hints.ai_socktype = SOCK_DGRAM;
1318
1319         /* Process command line options */
1320
1321         opterr = 0;
1322         while ((c = my_getopt(argc, argv, parms)) != -1) {
1323                 switch (c) {
1324                         case '?':
1325                                 usage();
1326
1327                         case 'h':
1328                                 usage();
1329                 }
1330         }
1331
1332         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1333         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1334         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1335         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1336         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1337         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1338         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1339         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1340         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1341         if (parms[nflag].count) {
1342                 switch (atoi(parms[nflag].arg)) {
1343                         case 1:
1344                                 netflow = &NetFlow1;
1345                                 break;
1346
1347                         case 5:
1348                                 break;
1349
1350                         case 7:
1351                                 netflow = &NetFlow7;
1352                                 break;
1353
1354                         default:
1355                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1356                                 exit(1);
1357                 }
1358         }
1359         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1360         if (parms[lflag].count) {
1361                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1362                         *log_suffix++ = 0;
1363                         if (*log_suffix) {
1364                                 sprintf(errpbuf, "[%s]", log_suffix);
1365                                 strcat(ident, errpbuf);
1366                         }
1367                 }
1368                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1369                 if (log_suffix) *--log_suffix = ':';
1370         }
1371         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1372 err_malloc:
1373                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1374                 exit(1);
1375         }
1376         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1377         if (parms[qflag].count) {
1378                 pending_queue_length = atoi(parms[qflag].arg);
1379                 if (pending_queue_length < 1) {
1380                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1381                         exit(1);
1382                 }
1383         }
1384         if (parms[rflag].count) {
1385                 schedp.sched_priority = atoi(parms[rflag].arg);
1386                 if (schedp.sched_priority
1387                                 && (schedp.sched_priority < sched_min
1388                                         || schedp.sched_priority > sched_max)) {
1389                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1390                         exit(1);
1391                 }
1392         }
1393         if (parms[Bflag].count) {
1394                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1395         }
1396         if (parms[bflag].count) {
1397                 bulk_quantity = atoi(parms[bflag].arg);
1398                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1399                         fprintf(stderr, "Illegal %s\n", "bulk size");
1400                         exit(1);
1401                 }
1402         }
1403         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1404         if (parms[Xflag].count) {
1405                 for(i = 0; parms[Xflag].arg[i]; i++)
1406                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1407                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1408                         goto err_malloc;
1409                 rule = strtok(parms[Xflag].arg, ":");
1410                 for (i = 0; rule; i++) {
1411                         snmp_rules[i].len = strlen(rule);
1412                         if (snmp_rules[i].len > IFNAMSIZ) {
1413                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1414                                 exit(1);
1415                         }
1416                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1417                         if (!*(rule - 1)) *(rule - 1) = ',';
1418                         rule = strtok(NULL, ",");
1419                         if (!rule) {
1420                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1421                                 exit(1);
1422                         }
1423                         snmp_rules[i].base = atoi(rule);
1424                         *(rule - 1) = ':';
1425                         rule = strtok(NULL, ":");
1426                 }
1427                 nsnmp_rules = i;
1428         }
1429         if (parms[tflag].count)
1430                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1431         if (parms[aflag].count) {
1432                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1433 bad_lhost:
1434                         fprintf(stderr, "Illegal %s\n", "source address");
1435                         exit(1);
1436                 } else {
1437                         saddr = *((struct sockaddr_in *) res->ai_addr);
1438                         freeaddrinfo(res);
1439                 }
1440         }
1441         if (parms[uflag].count) 
1442                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1443                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1444                         exit(1);
1445                 }
1446
1447
1448         /* Process collectors parameters. Brrrr... :-[ */
1449
1450         npeers = argc - optind;
1451         if (npeers > 1) {
1452                 /* Send to remote Netflow collector */
1453                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1454                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1455                         dhost = argv[i];
1456                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1457                         *dport++ = 0;
1458                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1459                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1460                                 exit(1);
1461                         }
1462                         peers[npeers].write_fd = write_fd;
1463                         peers[npeers].type = PEER_MIRROR;
1464                         peers[npeers].laddr = saddr;
1465                         peers[npeers].seq = 0;
1466                         if ((lhost = strchr(dport, '/'))) {
1467                                 *lhost++ = 0;
1468                                 if ((type = strchr(lhost, '/'))) {
1469                                         *type++ = 0;
1470                                         switch (*type) {
1471                                                 case 0:
1472                                                 case 'm':
1473                                                         break;
1474
1475                                                 case 'r':
1476                                                         peers[npeers].type = PEER_ROTATE;
1477                                                         npeers_rot++;
1478                                                         break;
1479
1480                                                 default:
1481                                                         goto bad_collector;
1482                                         }
1483                                 }
1484                                 if (*lhost) {
1485                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1486                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1487                                         freeaddrinfo(res);
1488                                 }
1489                         }
1490                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1491                                                 sizeof(struct sockaddr_in))) {
1492                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1493                                 exit(1);
1494                         }
1495                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1496 bad_collector:
1497                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1498                                 exit(1);
1499                         }
1500                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1501                         freeaddrinfo(res);
1502                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1503                                                 sizeof(struct sockaddr_in))) {
1504                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1505                                 exit(1);
1506                         }
1507
1508                         /* Restore command line */
1509                         if (type) *--type = '/';
1510                         if (lhost) *--lhost = '/';
1511                         *--dport = ':';
1512                 }
1513         }
1514         else if (parms[fflag].count) {
1515                 // log into a file
1516                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1517                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1518                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1519
1520                 peers[npeers].write_fd = START_VALUE;
1521                 peers[npeers].type = PEER_FILE;
1522                 peers[npeers].seq = 0;
1523
1524                 get_cur_epoch();
1525                 npeers++;
1526         }
1527         else 
1528                 usage();
1529
1530
1531         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1532         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1533         if (!ulog_handle) {
1534                 fprintf(stderr, "libipulog initialization error: %s",
1535                                 ipulog_strerror(ipulog_errno));
1536                 exit(1);
1537         }
1538         if (sockbufsize)
1539                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1540                                         &sockbufsize, sizeof(sockbufsize)) < 0)
1541                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1542
1543         /* Daemonize (if log destination stdout-free) */
1544
1545         my_log_open(ident, verbosity, log_dest);
1546
1547         init_daemon();
1548
1549         if (!(log_dest & 2)) {
1550                 /* Crash-proofing - Sapan*/
1551                 while (1) {
1552                         int pid=fork();
1553                         if (pid==-1) {
1554                                 fprintf(stderr, "fork(): %s", strerror(errno));
1555                                 exit(1);
1556                         }
1557                         else if (pid==0) {
1558                                 setsid();
1559                                 freopen("/dev/null", "r", stdin);
1560                                 freopen("/dev/null", "w", stdout);
1561                                 freopen("/dev/null", "w", stderr);
1562                                 break;
1563                         }
1564                         else {
1565                                 while (wait3(NULL,0,NULL) < 1);
1566                         }
1567                 }
1568         } else {
1569                 setvbuf(stdout, (char *)0, _IONBF, 0);
1570                 setvbuf(stderr, (char *)0, _IONBF, 0);
1571         }
1572
1573         pid = getpid();
1574         sprintf(errpbuf, "[%ld]", (long) pid);
1575         strcat(ident, errpbuf);
1576
1577         /* Initialization */
1578
1579         hash_init(); /* Actually for crc16 only */
1580         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1581         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1582
1583 #ifdef UPTIME_TRICK
1584         /* Hope 12 days is enough :-/ */
1585         start_time_offset = 1 << 20;
1586
1587         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1588 #endif
1589         gettime(&start_time);
1590
1591         /*
1592            Build static pending queue as circular buffer.
1593            */
1594         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1595         pending_tail = pending_head;
1596         for (i = pending_queue_length - 1; i--;) {
1597                 if (!(pending_tail->next = mem_alloc())) {
1598 err_mem_alloc:
1599                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1600                         exit(1);
1601                 }
1602                 pending_tail = pending_tail->next;
1603         }
1604         pending_tail->next = pending_head;
1605         pending_tail = pending_head;
1606
1607         sigemptyset(&sig_mask);
1608         sigact.sa_handler = &sighandler;
1609         sigact.sa_mask = sig_mask;
1610         sigact.sa_flags = 0;
1611         sigaddset(&sig_mask, SIGTERM);
1612         sigaction(SIGTERM, &sigact, 0);
1613 #if ((DEBUG) & DEBUG_I)
1614         sigaddset(&sig_mask, SIGUSR1);
1615         sigaction(SIGUSR1, &sigact, 0);
1616 #endif
1617         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1618                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1619                 exit(1);
1620         }
1621
1622         my_log(LOG_INFO, "Starting %s...", VERSION);
1623
1624         if (parms[cflag].count) {
1625                 if (chdir(parms[cflag].arg) || chroot(".")) {
1626                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1627                         exit(1);
1628                 }
1629         }
1630
1631         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1632         pthread_attr_init(&tattr);
1633         for (i = 0; i < THREADS - 1; i++) {
1634                 if (schedp.sched_priority > 0) {
1635                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1636                                         (pthread_attr_setschedparam(&tattr, &schedp))) {
1637                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1638                                 exit(1);
1639                         }
1640                 }
1641                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1642                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1643                         exit(1);
1644                 }
1645                 pthread_detach(thid);
1646                 schedp.sched_priority++;
1647         }
1648
1649         if (pw) {
1650                 if (setgroups(0, NULL)) {
1651                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1652                         exit(1);
1653                 }
1654                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1655                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1656                         exit(1);
1657                 }
1658                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1659                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1660                         exit(1);
1661                 }
1662         }
1663
1664         if (!(pidfile = fopen(pidfilepath, "w")))
1665                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1666         else {
1667                 fprintf(pidfile, "%ld\n", (long) pid);
1668                 fclose(pidfile);
1669         }
1670
1671         my_log(LOG_INFO, "pid: %d", pid);
1672         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1673                         "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1674                         ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1675                         netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1676                         memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1677                         emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1678                         parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1679         for (i = 0; i < nsnmp_rules; i++) {
1680                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1681                                 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1682         }
1683         for (i = 0; i < npeers; i++) {
1684                 switch (peers[i].type) {
1685                         case PEER_MIRROR:
1686                                 c = 'm';
1687                                 break;
1688                         case PEER_ROTATE:
1689                                 c = 'r';
1690                                 break;
1691                 }
1692                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1693                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1694                                 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1695         }
1696
1697         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1698
1699         timeout.tv_usec = 0;
1700         while (!killed
1701                         || (total_elements - free_elements - pending_queue_length)
1702                         || emit_count
1703                         || pending_tail->flags) {
1704
1705                 if (!sigs) {
1706                         timeout.tv_sec = scan_interval;
1707                         select(0, 0, 0, 0, &timeout);
1708                 }
1709
1710                 if (sigs & SIGTERM_MASK && !killed) {
1711                         sigs &= ~SIGTERM_MASK;
1712                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1713                         scan_interval = 1;
1714                         frag_lifetime = -1;
1715                         active_lifetime = -1;
1716                         inactive_lifetime = -1;
1717                         emit_timeout = 1;
1718                         unpending_timeout = 1;
1719                         killed = 1;
1720                         pthread_cond_signal(&scan_cond);
1721                         pthread_cond_signal(&unpending_cond);
1722                 }
1723
1724 #if ((DEBUG) & DEBUG_I)
1725                 if (sigs & SIGUSR1_MASK) {
1726                         sigs &= ~SIGUSR1_MASK;
1727                         info_debug();
1728                 }
1729 #endif
1730         }
1731         remove(pidfilepath);
1732 #if ((DEBUG) & DEBUG_I)
1733         info_debug();
1734 #endif
1735         my_log(LOG_INFO, "Done.");
1736 #ifdef WALL
1737         return 0;
1738 #endif
1739 }