Merged 4.1 branch
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
41         int fd;
42         u_int8_t blocking;
43         struct sockaddr_nl local;
44         struct sockaddr_nl peer;
45         struct nlmsghdr* last_nlhdr;
46 };
47
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
58 #include <net/if.h>
59
60 #include <sys/param.h>
61 #include <pwd.h>
62 #ifdef OS_LINUX
63 #include <grp.h>
64 #endif
65
66 /* pthread_*() */
67 #include <pthread.h>
68
69 /* errno */
70 #include <errno.h>
71
72 /* getaddrinfo() */
73 #include <netdb.h>
74
75 /* nanosleep() */
76 #include <time.h>
77
78 /* gettimeofday() */
79 #include <sys/time.h>
80
81 /* scheduling */
82 #include <sched.h>
83
84 /* select() (POSIX)*/
85 #include <sys/select.h>
86
87 /* open() */
88 #include <sys/stat.h>
89 #include <fcntl.h>
90
91 #include <fprobe-ulog.h>
92 #include <my_log.h>
93 #include <my_getopt.h>
94 #include <netflow.h>
95 #include <hash.h>
96 #include <mem.h>
97
98 #define PIDFILE "/var/log/fprobe-ulog.pid"
99
100 enum {
101         aflag,
102         Bflag,
103         bflag,
104         cflag,
105         dflag,
106         Dflag,
107         eflag,
108         Eflag,
109         fflag,
110         gflag,
111         hflag,
112         lflag,
113         mflag,
114         Mflag,
115         nflag,
116         qflag,
117         rflag,
118         sflag,
119         tflag,
120         Tflag,
121         Uflag,
122         uflag,
123         vflag,
124         Wflag,
125         Xflag,
126 };
127
128 static struct getopt_parms parms[] = {
129         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'h', 0, 0, 0},
140         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'M', 0, 0, 0},
143         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
148         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {0, 0, 0, 0}
154 };
155
156 extern char *optarg;
157 extern int optind, opterr, optopt;
158 extern int errno;
159
160 extern struct NetFlow NetFlow1;
161 extern struct NetFlow NetFlow5;
162 extern struct NetFlow NetFlow7;
163
164 #define START_VALUE -5
165 #define mark_is_tos parms[Mflag].count
166 static unsigned scan_interval = 5;
167 static int min_free = 0;
168 static int frag_lifetime = 30;
169 static int inactive_lifetime = 60;
170 static int active_lifetime = 300;
171 static int sockbufsize;
172 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
173 #if (MEM_BITS == 0) || (MEM_BITS == 16)
174 #define BULK_QUANTITY 10000
175 #else
176 #define BULK_QUANTITY 200
177 #endif
178
179 static unsigned epoch_length=60, log_epochs=1; 
180 static unsigned cur_epoch=0,prev_uptime=0;
181
182 static unsigned bulk_quantity = BULK_QUANTITY;
183 static unsigned pending_queue_length = 100;
184 static struct NetFlow *netflow = &NetFlow5;
185 static unsigned verbosity = 6;
186 static unsigned log_dest = MY_LOG_SYSLOG;
187 static struct Time start_time;
188 static long start_time_offset;
189 static int off_tl;
190 /* From mem.c */
191 extern unsigned total_elements;
192 extern unsigned free_elements;
193 extern unsigned total_memory;
194 #if ((DEBUG) & DEBUG_I)
195 static unsigned emit_pkts, emit_queue;
196 static uint64_t size_total;
197 static unsigned pkts_total, pkts_total_fragmented;
198 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
199 static unsigned pkts_pending, pkts_pending_done;
200 static unsigned pending_queue_trace, pending_queue_trace_candidate;
201 static unsigned flows_total, flows_fragmented;
202 #endif
203 static unsigned emit_count;
204 static uint32_t emit_sequence;
205 static unsigned emit_rate_bytes, emit_rate_delay;
206 static struct Time emit_time;
207 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
208 static pthread_t thid;
209 static sigset_t sig_mask;
210 static struct sched_param schedp;
211 static int sched_min, sched_max;
212 static int npeers, npeers_rot;
213 static struct peer *peers;
214 static int sigs;
215
216 static struct Flow *flows[1 << HASH_BITS];
217 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
218
219 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
220 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
221
222 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
224 static struct Flow *pending_head, *pending_tail;
225 static struct Flow *scan_frag_dreg;
226
227 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *flows_emit;
230
231 static char ident[256] = "fprobe-ulog";
232 static FILE *pidfile;
233 static char *pidfilepath;
234 static pid_t pid;
235 static int killed;
236 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
237 static struct ipulog_handle *ulog_handle;
238 static uint32_t ulog_gmask = 1;
239 static char *cap_buf;
240 static int nsnmp_rules;
241 static struct snmp_rule *snmp_rules;
242 static struct passwd *pw = 0;
243
244 void usage()
245 {
246         fprintf(stdout,
247                 "fprobe-ulog: a NetFlow probe. Version %s\n"
248                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
249                 "\n"
250                 "-h\t\tDisplay this help\n"
251                 "-U <mask>\tULOG group bitwise mask [1]\n"
252                 "-s <seconds>\tHow often scan for expired flows [5]\n"
253                 "-g <seconds>\tFragmented flow lifetime [30]\n"
254                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
255                 "-f <filename>\tLog flow data in a file\n"
256                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
257                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
258                 "-a <address>\tUse <address> as source for NetFlow flow\n"
259                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
260                 "-M\t\tUse netfilter mark value as ToS flag\n"
261                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
262                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
263                 "-q <flows>\tPending queue length [100]\n"
264                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
265                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
266                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
267                 "-c <directory>\tDirectory to chroot to\n"
268                 "-u <user>\tUser to run as\n"
269                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
270                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
271                 "-y <remote:port>\tAddress of the NetFlow collector\n"
272                 "-f <writable file>\tFile to write data into\n"
273                 "-T <n>\tRotate log file every n epochs\n"
274                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
275                 "-E <[1..60]>\tSize of an epoch in minutes\n"
276                 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
277                 ,
278                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
279         exit(0);
280 }
281
282 #if ((DEBUG) & DEBUG_I)
283 void info_debug()
284 {
285         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
286                 pkts_total, pkts_total_fragmented, size_total,
287                 pkts_pending - pkts_pending_done, pending_queue_trace);
288         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
289                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
290         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
291                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
292         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
293                 total_elements, free_elements, total_memory);
294 }
295 #endif
296
297 void sighandler(int sig)
298 {
299         switch (sig) {
300                 case SIGTERM:
301                         sigs |= SIGTERM_MASK;
302                         break;
303 #if ((DEBUG) & DEBUG_I)
304                 case SIGUSR1:
305                         sigs |= SIGUSR1_MASK;
306                         break;
307 #endif
308         }
309 }
310
311 void gettime(struct Time *now)
312 {
313         struct timeval t;
314
315         gettimeofday(&t, 0);
316         now->sec = t.tv_sec;
317         now->usec = t.tv_usec;
318 }
319
320
321 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
322 {
323         return (t1->sec - t2->sec)/60;
324 }
325
326 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
327 {
328         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
329 }
330
331 /* Uptime in miliseconds */
332 uint32_t getuptime(struct Time *t)
333 {
334         /* Maximum uptime is about 49/2 days */
335         return cmpmtime(t, &start_time);
336 }
337
338 /* Uptime in minutes */
339 uint32_t getuptime_minutes(struct Time *t)
340 {
341         /* Maximum uptime is about 49/2 days */
342         return cmpMtime(t, &start_time);
343 }
344
345 hash_t hash_flow(struct Flow *flow)
346 {
347         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
348         else return hash(flow, sizeof(struct Flow_TL));
349 }
350
351 uint16_t snmp_index(char *name) {
352         uint32_t i;
353
354         if (!*name) return 0;
355
356         for (i = 0; (int) i < nsnmp_rules; i++) {
357                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
358                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
359         }
360
361         if ((i = if_nametoindex(name))) return i;
362
363         return -1;
364 }
365
366 inline void copy_flow(struct Flow *src, struct Flow *dst)
367 {
368         dst->iif = src->iif;
369         dst->oif = src->oif;
370         dst->sip = src->sip;
371         dst->dip = src->dip;
372         dst->tos = src->tos;
373         dst->proto = src->proto;
374         dst->tcp_flags = src->tcp_flags;
375         dst->id = src->id;
376         dst->sp = src->sp;
377         dst->dp = src->dp;
378         dst->pkts = src->pkts;
379         dst->size = src->size;
380         dst->sizeF = src->sizeF;
381         dst->sizeP = src->sizeP;
382         dst->ctime = src->ctime;
383         dst->mtime = src->mtime;
384         dst->flags = src->flags;
385 }
386
387 void get_cur_epoch() {
388         int fd;
389         fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
390         if (fd != -1) {
391                 char snum[7];
392                 ssize_t len;
393                 len = read(fd, snum, sizeof(snum)-1);
394                 if (len != -1) {
395                         snum[len]='\0';
396                         sscanf(snum,"%d",&cur_epoch);
397                         cur_epoch++; /* Let's not stone the last epoch */
398                         close(fd);
399                 }
400         }
401         return;
402 }
403
404
405 void update_cur_epoch_file(int n) {
406         int fd, len;
407         char snum[7];
408         len=snprintf(snum,6,"%d",n);
409         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
410         if (fd == -1) {
411                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
412                 return;
413         }
414         write(fd, snum, len);
415         close(fd);
416 }
417
418 unsigned get_log_fd(char *fname, int cur_fd) {
419         struct Time now;
420         unsigned cur_uptime;
421         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
422          * doesn't solve the problem */
423
424         struct statfs statfs;
425         int ret_fd;
426         gettime(&now);
427         cur_uptime = getuptime_minutes(&now);
428
429
430         if (cur_fd!=START_VALUE) {
431                 if (fstatfs(cur_fd, &statfs) == -1) {
432                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
433                 }
434                 else {
435                         if (min_free && (statfs.f_bavail < min_free)) 
436                                 switch(cur_epoch) {
437                                         case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
438                                                 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
439                                                 exit(1);
440                                         default:
441                                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
442                                                 cur_epoch = -1;
443                                 }
444                 }
445         }
446
447         /* Epoch length in minutes */
448         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
449                 char nextname[MAX_PATH_LEN];
450                 int write_fd;
451                 prev_uptime = cur_uptime;
452                 cur_epoch = (cur_epoch + 1) % log_epochs;
453                 if (cur_fd>0)
454                         close(cur_fd);
455                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
456                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
457                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
458                         exit(1);
459                 }
460                 update_cur_epoch_file(cur_epoch);
461                 ret_fd = write_fd;
462         }
463         else {
464                 ret_fd = cur_fd;
465         }
466         return(ret_fd);
467 }
468
469 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
470 {
471         struct Flow **flowpp;
472
473 #ifdef WALL
474         flowpp = 0;
475 #endif
476
477         if (prev) flowpp = *prev;
478
479         while (where) {
480                 if (where->sip.s_addr == what->sip.s_addr
481                         && where->dip.s_addr == what->dip.s_addr
482                         && where->proto == what->proto) {
483                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
484                                 case 0:
485                                         /* Both unfragmented */
486                                         if ((what->sp == where->sp)
487                                                 && (what->dp == where->dp)) goto done;
488                                         break;
489                                 case 2:
490                                         /* Both fragmented */
491                                         if (where->id == what->id) goto done;
492                                         break;
493                         }
494                 }
495                 flowpp = &where->next;
496                 where = where->next;
497         }
498 done:
499         if (prev) *prev = flowpp;
500         return where;
501 }
502
503 int put_into(struct Flow *flow, int flag
504 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
505         , char *logbuf
506 #endif
507 )
508 {
509         int ret = 0;
510         hash_t h;
511         struct Flow *flown, **flowpp;
512 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
513         char buf[64];
514 #endif
515
516         h = hash_flow(flow);
517 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
518         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
519         strcat(logbuf, buf);
520 #endif
521         pthread_mutex_lock(&flows_mutex[h]);
522         flowpp = &flows[h];
523         if (!(flown = find(flows[h], flow, &flowpp))) {
524                 /* No suitable flow found - add */
525                 if (flag == COPY_INTO) {
526                         if ((flown = mem_alloc())) {
527                                 copy_flow(flow, flown);
528                                 flow = flown;
529                         } else {
530 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
531                                 my_log(LOG_ERR, "%s %s. %s",
532                                         "mem_alloc():", strerror(errno), "packet lost");
533 #endif
534                                 return -1;
535                         }
536                 }
537                 flow->next = flows[h];
538                 flows[h] = flow;
539 #if ((DEBUG) & DEBUG_I)
540                 flows_total++;
541                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
542 #endif
543 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
544                 if (flown) {
545                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
546                         strcat(logbuf, buf);
547                 }
548 #endif
549         } else {
550                 /* Found suitable flow - update */
551 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
552                 sprintf(buf, " +> %x", (unsigned) flown);
553                 strcat(logbuf, buf);
554 #endif
555                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
556                         flown->mtime = flow->mtime;
557                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
558                         flown->ctime = flow->ctime;
559                 flown->tcp_flags |= flow->tcp_flags;
560                 flown->size += flow->size;
561                 flown->pkts += flow->pkts;
562                 if (flow->flags & FLOW_FRAG) {
563                         /* Fragmented flow require some additional work */
564                         if (flow->flags & FLOW_TL) {
565                                 /*
566                                 ?FIXME?
567                                 Several packets with FLOW_TL (attack)
568                                 */
569                                 flown->sp = flow->sp;
570                                 flown->dp = flow->dp;
571                         }
572                         if (flow->flags & FLOW_LASTFRAG) {
573                                 /*
574                                 ?FIXME?
575                                 Several packets with FLOW_LASTFRAG (attack)
576                                 */
577                                 flown->sizeP = flow->sizeP;
578                         }
579                         flown->flags |= flow->flags;
580                         flown->sizeF += flow->sizeF;
581                         if ((flown->flags & FLOW_LASTFRAG)
582                                 && (flown->sizeF >= flown->sizeP)) {
583                                 /* All fragments received - flow reassembled */
584                                 *flowpp = flown->next;
585                                 pthread_mutex_unlock(&flows_mutex[h]);
586 #if ((DEBUG) & DEBUG_I)
587                                 flows_total--;
588                                 flows_fragmented--;
589 #endif
590                                 flown->id = 0;
591                                 flown->flags &= ~FLOW_FRAG;
592 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
593                                 strcat(logbuf," R");
594 #endif
595                                 ret = put_into(flown, MOVE_INTO
596 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
597                                                 , logbuf
598 #endif
599                                         );
600                         }
601                 }
602                 if (flag == MOVE_INTO) mem_free(flow);
603         }
604         pthread_mutex_unlock(&flows_mutex[h]);
605         return ret;
606 }
607
608 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
609 {
610         int i;
611
612         for (i = 0; i < fields; i++) {
613 #if ((DEBUG) & DEBUG_F)
614                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
615 #endif
616                 switch (format[i]) {
617                         case NETFLOW_IPV4_SRC_ADDR:
618                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
619                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
620                                 break;
621
622                         case NETFLOW_IPV4_DST_ADDR:
623                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
624                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
625                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
626                                 }
627                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
628                                 break;
629
630                         case NETFLOW_INPUT_SNMP:
631                                 *((uint16_t *) p) = htons(flow->iif);
632                                 p += NETFLOW_INPUT_SNMP_SIZE;
633                                 break;
634
635                         case NETFLOW_OUTPUT_SNMP:
636                                 *((uint16_t *) p) = htons(flow->oif);
637                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
638                                 break;
639
640                         case NETFLOW_PKTS_32:
641                                 *((uint32_t *) p) = htonl(flow->pkts);
642                                 p += NETFLOW_PKTS_32_SIZE;
643                                 break;
644
645                         case NETFLOW_BYTES_32:
646                                 *((uint32_t *) p) = htonl(flow->size);
647                                 p += NETFLOW_BYTES_32_SIZE;
648                                 break;
649
650                         case NETFLOW_FIRST_SWITCHED:
651                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
652                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
653                                 break;
654
655                         case NETFLOW_LAST_SWITCHED:
656                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
657                                 p += NETFLOW_LAST_SWITCHED_SIZE;
658                                 break;
659
660                         case NETFLOW_L4_SRC_PORT:
661                                 *((uint16_t *) p) = flow->sp;
662                                 p += NETFLOW_L4_SRC_PORT_SIZE;
663                                 break;
664
665                         case NETFLOW_L4_DST_PORT:
666                                 *((uint16_t *) p) = flow->dp;
667                                 p += NETFLOW_L4_DST_PORT_SIZE;
668                                 break;
669
670                         case NETFLOW_PROT:
671                                 *((uint8_t *) p) = flow->proto;
672                                 p += NETFLOW_PROT_SIZE;
673                                 break;
674
675                         case NETFLOW_SRC_TOS:
676                                 *((uint8_t *) p) = flow->tos;
677                                 p += NETFLOW_SRC_TOS_SIZE;
678                                 break;
679
680                         case NETFLOW_TCP_FLAGS:
681                                 *((uint8_t *) p) = flow->tcp_flags;
682                                 p += NETFLOW_TCP_FLAGS_SIZE;
683                                 break;
684
685                         case NETFLOW_VERSION:
686                                 *((uint16_t *) p) = htons(netflow->Version);
687                                 p += NETFLOW_VERSION_SIZE;
688                                 break;
689
690                         case NETFLOW_COUNT:
691                                 *((uint16_t *) p) = htons(emit_count);
692                                 p += NETFLOW_COUNT_SIZE;
693                                 break;
694
695                         case NETFLOW_UPTIME:
696                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
697                                 p += NETFLOW_UPTIME_SIZE;
698                                 break;
699
700                         case NETFLOW_UNIX_SECS:
701                                 *((uint32_t *) p) = htonl(emit_time.sec);
702                                 p += NETFLOW_UNIX_SECS_SIZE;
703                                 break;
704
705                         case NETFLOW_UNIX_NSECS:
706                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
707                                 p += NETFLOW_UNIX_NSECS_SIZE;
708                                 break;
709
710                         case NETFLOW_FLOW_SEQUENCE:
711                                 //*((uint32_t *) p) = htonl(emit_sequence);
712                                 *((uint32_t *) p) = 0;
713                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
714                                 break;
715
716                         case NETFLOW_PAD8:
717                         /* Unsupported (uint8_t) */
718                         case NETFLOW_ENGINE_TYPE:
719                         case NETFLOW_ENGINE_ID:
720                         case NETFLOW_FLAGS7_1:
721                         case NETFLOW_SRC_MASK:
722                         case NETFLOW_DST_MASK:
723                                 *((uint8_t *) p) = 0;
724                                 p += NETFLOW_PAD8_SIZE;
725                                 break;
726                         case NETFLOW_XID:
727                                 *((uint16_t *) p) = flow->tos;
728                                 p += NETFLOW_XID_SIZE;
729                                 break;
730                         case NETFLOW_PAD16:
731                         /* Unsupported (uint16_t) */
732                         case NETFLOW_SRC_AS:
733                         case NETFLOW_DST_AS:
734                         case NETFLOW_FLAGS7_2:
735                                 *((uint16_t *) p) = 0;
736                                 p += NETFLOW_PAD16_SIZE;
737                                 break;
738
739                         case NETFLOW_PAD32:
740                         /* Unsupported (uint32_t) */
741                         case NETFLOW_IPV4_NEXT_HOP:
742                         case NETFLOW_ROUTER_SC:
743                                 *((uint32_t *) p) = 0;
744                                 p += NETFLOW_PAD32_SIZE;
745                                 break;
746
747                         default:
748                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
749                                         format, i, format[i]);
750                                 exit(1);
751                 }
752         }
753 #if ((DEBUG) & DEBUG_F)
754         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
755 #endif
756         return p;
757 }
758
759 void setuser() {
760         /*
761         Workaround for clone()-based threads
762         Try to change EUID independently of main thread
763         */
764         if (pw) {
765                 setgroups(0, NULL);
766                 setregid(pw->pw_gid, pw->pw_gid);
767                 setreuid(pw->pw_uid, pw->pw_uid);
768         }
769 }
770
771 void *emit_thread()
772 {
773         struct Flow *flow;
774         void *p;
775         struct timeval now;
776         struct timespec timeout;
777         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
778
779         p = (void *) &emit_packet + netflow->HeaderSize;
780         timeout.tv_nsec = 0;
781
782         setuser();
783
784         for (;;) {
785                 pthread_mutex_lock(&emit_mutex);
786                 while (!flows_emit) {
787                         gettimeofday(&now, 0);
788                         timeout.tv_sec = now.tv_sec + emit_timeout;
789                         /* Do not wait until emit_packet will filled - it may be too long */
790                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
791                                 pthread_mutex_unlock(&emit_mutex);
792                                 goto sendit;
793                         }
794                 }
795                 flow = flows_emit;
796                 flows_emit = flows_emit->next;
797 #if ((DEBUG) & DEBUG_I)
798                 emit_queue--;
799 #endif          
800                 pthread_mutex_unlock(&emit_mutex);
801
802 #ifdef UPTIME_TRICK
803                 if (!emit_count) {
804                         gettime(&start_time);
805                         start_time.sec -= start_time_offset;
806                 }
807 #endif
808                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
809                 mem_free(flow);
810                 emit_count++;
811 #ifdef PF2_DEBUG
812                 printf("Emit count = %d\n", emit_count);
813                 fflush(stdout);
814 #endif
815                 if (emit_count == netflow->MaxFlows) {
816                 sendit:
817                         gettime(&emit_time);
818                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
819                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
820                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
821                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
822                         peer_rot_cur = 0;
823                         for (i = 0; i < npeers; i++) {
824                                 if (peers[i].type == PEER_FILE) {
825                                                 if (netflow->SeqOffset)
826                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
827                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
828                                                 ret = write(peers[i].write_fd, emit_packet, size);
829                                                 if (ret < size) {
830
831 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
832                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
833                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
834 #endif
835 #undef MESSAGES
836                                                 }
837 #if ((DEBUG) & DEBUG_E)
838                                                 commaneelse {
839                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
840                                                                 emit_count, i + 1, peers[i].seq);
841                                                 }
842 #endif
843                                                 peers[i].seq += emit_count;
844
845                                                 /* Rate limit */
846                                                 if (emit_rate_bytes) {
847                                                         sent += size;
848                                                         delay = sent / emit_rate_bytes;
849                                                         if (delay) {
850                                                                 sent %= emit_rate_bytes;
851                                                                 timeout.tv_sec = 0;
852                                                                 timeout.tv_nsec = emit_rate_delay * delay;
853                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
854                                                         }
855                                                 }
856                                         }
857                                 else
858                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
859                                 else
860                                 if (peers[i].type == PEER_ROTATE) 
861                                         if (peer_rot_cur++ == peer_rot_work) {
862                                         sendreal:
863                                                 if (netflow->SeqOffset)
864                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
865                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
866                                                 if (ret < size) {
867 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
868                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
869                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
870 #endif
871                                                 }
872 #if ((DEBUG) & DEBUG_E)
873                                                 commaneelse {
874                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
875                                                                 emit_count, i + 1, peers[i].seq);
876                                                 }
877 #endif
878                                                 peers[i].seq += emit_count;
879
880                                                 /* Rate limit */
881                                                 if (emit_rate_bytes) {
882                                                         sent += size;
883                                                         delay = sent / emit_rate_bytes;
884                                                         if (delay) {
885                                                                 sent %= emit_rate_bytes;
886                                                                 timeout.tv_sec = 0;
887                                                                 timeout.tv_nsec = emit_rate_delay * delay;
888                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
889                                                         }
890                                                 }
891                                         }
892                         }
893                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
894                         emit_sequence += emit_count;
895                         emit_count = 0;
896 #if ((DEBUG) & DEBUG_I)
897                         emit_pkts++;
898 #endif
899                 }
900         }
901 }       
902
903 void *unpending_thread()
904 {
905         struct timeval now;
906         struct timespec timeout;
907 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
908         char logbuf[256];
909 #endif
910
911         setuser();
912
913         timeout.tv_nsec = 0;
914         pthread_mutex_lock(&unpending_mutex);
915
916         for (;;) {
917                 while (!(pending_tail->flags & FLOW_PENDING)) {
918                         gettimeofday(&now, 0);
919                         timeout.tv_sec = now.tv_sec + unpending_timeout;
920                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
921                 }
922
923 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
924                 *logbuf = 0;
925 #endif
926                 if (put_into(pending_tail, COPY_INTO
927 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
928                                 , logbuf
929 #endif
930                         ) < 0) {
931 #if ((DEBUG) & DEBUG_I)
932                         pkts_lost_unpending++;
933 #endif                          
934                 }
935
936 #if ((DEBUG) & DEBUG_U)
937                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
938 #endif
939
940                 pending_tail->flags = 0;
941                 pending_tail = pending_tail->next;
942 #if ((DEBUG) & DEBUG_I)
943                 pkts_pending_done++;
944 #endif
945         }
946 }
947
948 void *scan_thread()
949 {
950 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
951         char logbuf[256];
952 #endif
953         int i;
954         struct Flow *flow, **flowpp;
955         struct Time now;
956         struct timespec timeout;
957
958         setuser();
959
960         timeout.tv_nsec = 0;
961         pthread_mutex_lock(&scan_mutex);
962
963         for (;;) {
964                 gettime(&now);
965                 timeout.tv_sec = now.sec + scan_interval;
966                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
967
968                 gettime(&now);
969 #if ((DEBUG) & DEBUG_S)
970                 my_log(LOG_DEBUG, "S: %d", now.sec);
971 #endif
972                 for (i = 0; i < 1 << HASH_BITS ; i++) {
973                         pthread_mutex_lock(&flows_mutex[i]);
974                         flow = flows[i];
975                         flowpp = &flows[i];
976                         while (flow) {
977                                 if (flow->flags & FLOW_FRAG) {
978                                         /* Process fragmented flow */
979                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
980                                                 /* Fragmented flow expired - put it into special chain */
981 #if ((DEBUG) & DEBUG_I)
982                                                 flows_fragmented--;
983                                                 flows_total--;
984 #endif
985                                                 *flowpp = flow->next;
986                                                 flow->id = 0;
987                                                 flow->flags &= ~FLOW_FRAG;
988                                                 flow->next = scan_frag_dreg;
989                                                 scan_frag_dreg = flow;
990                                                 flow = *flowpp;
991                                                 continue;
992                                         }
993                                 } else {
994                                         /* Flow is not frgamented */
995                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
996                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
997                                                 /* Flow expired */
998 #if ((DEBUG) & DEBUG_S)
999                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1000 #endif
1001 #if ((DEBUG) & DEBUG_I)
1002                                                 flows_total--;
1003 #endif
1004                                                 *flowpp = flow->next;
1005                                                 pthread_mutex_lock(&emit_mutex);
1006                                                 flow->next = flows_emit;
1007                                                 flows_emit = flow;
1008 #if ((DEBUG) & DEBUG_I)
1009                                                 emit_queue++;
1010 #endif                          
1011                                                 pthread_mutex_unlock(&emit_mutex);
1012                                                 flow = *flowpp;
1013                                                 continue;
1014                                         }
1015                                 }
1016                                 flowpp = &flow->next;
1017                                 flow = flow->next;
1018                         } /* chain loop */
1019                         pthread_mutex_unlock(&flows_mutex[i]);
1020                 } /* hash loop */
1021                 if (flows_emit) pthread_cond_signal(&emit_cond);
1022
1023                 while (scan_frag_dreg) {
1024                         flow = scan_frag_dreg;
1025                         scan_frag_dreg = flow->next;
1026 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1027                         *logbuf = 0;
1028 #endif
1029                         put_into(flow, MOVE_INTO
1030 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1031                                 , logbuf
1032 #endif
1033                         );
1034 #if ((DEBUG) & DEBUG_S)
1035                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1036 #endif
1037                 }
1038         }
1039 }
1040
1041 void *cap_thread()
1042 {
1043         struct ulog_packet_msg *ulog_msg;
1044         struct ip *nl;
1045         void *tl;
1046         struct Flow *flow;
1047         int len, off_frag, psize;
1048 #if ((DEBUG) & DEBUG_C)
1049         char buf[64];
1050         char logbuf[256];
1051 #endif
1052
1053         setuser();
1054
1055         while (!killed) {
1056                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1057                 if (len <= 0) {
1058                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1059                         continue;
1060                 }
1061                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1062
1063 #if ((DEBUG) & DEBUG_C)
1064                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1065 #endif
1066
1067                         nl = (void *) &ulog_msg->payload;
1068                         psize = ulog_msg->data_len;
1069
1070                         /* Sanity check */
1071                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1072 #if ((DEBUG) & DEBUG_C)
1073                                 strcat(logbuf, " U");
1074                                 my_log(LOG_DEBUG, "%s", logbuf);
1075 #endif
1076 #if ((DEBUG) & DEBUG_I)
1077                                 pkts_ignored++;
1078 #endif
1079                                 continue;
1080                         }
1081
1082                         if (pending_head->flags) {
1083 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1084                                 my_log(LOG_ERR,
1085 # if ((DEBUG) & DEBUG_C)
1086                                         "%s %s %s", logbuf,
1087 # else
1088                                         "%s %s",
1089 # endif
1090                                         "pending queue full:", "packet lost");
1091 #endif
1092 #if ((DEBUG) & DEBUG_I)
1093                                 pkts_lost_capture++;
1094 #endif
1095                                 goto done;
1096                         }
1097
1098 #if ((DEBUG) & DEBUG_I)
1099                         pkts_total++;
1100 #endif
1101
1102                         flow = pending_head;
1103
1104                         /* ?FIXME? Add sanity check for ip_len? */
1105                         flow->size = ntohs(nl->ip_len);
1106 #if ((DEBUG) & DEBUG_I)
1107                         size_total += flow->size;
1108 #endif
1109
1110                         flow->sip = nl->ip_src;
1111                         flow->dip = nl->ip_dst;
1112                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1113                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1114                         }
1115                         flow->iif = snmp_index(ulog_msg->indev_name);
1116                         flow->oif = snmp_index(ulog_msg->outdev_name);
1117                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1118                         flow->proto = nl->ip_p;
1119                         flow->id = 0;
1120                         flow->tcp_flags = 0;
1121                         flow->pkts = 1;
1122                         flow->sizeF = 0;
1123                         flow->sizeP = 0;
1124                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1125                         if (ulog_msg->timestamp_sec) {
1126                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1127                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1128                         } else gettime(&flow->ctime);
1129                         flow->mtime = flow->ctime;
1130
1131                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1132
1133                         /*
1134                         Offset (from network layer) to transport layer header/IP data
1135                         IOW IP header size ;-)
1136
1137                         ?FIXME?
1138                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1139                         */
1140                         off_tl = nl->ip_hl << 2;
1141                         tl = (void *) nl + off_tl;
1142
1143                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1144                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1145                         psize -= off_tl;
1146                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1147                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1148
1149                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1150                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1151 #if ((DEBUG) & DEBUG_C)
1152                                 strcat(logbuf, " F");
1153 #endif
1154 #if ((DEBUG) & DEBUG_I)
1155                                 pkts_total_fragmented++;
1156 #endif
1157                                 flow->flags |= FLOW_FRAG;
1158                                 flow->id = nl->ip_id;
1159
1160                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1161                                         /* Packet whith IP_MF contains information about whole datagram size */
1162                                         flow->flags |= FLOW_LASTFRAG;
1163                                         /* size = frag_offset*8 + data_size */
1164                                         flow->sizeP = off_frag + flow->sizeF;
1165                                 }
1166                         }
1167
1168 #if ((DEBUG) & DEBUG_C)
1169                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1170                         strcat(logbuf, buf);
1171                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1172                         strcat(logbuf, buf);
1173 #endif
1174
1175                         /*
1176                         Fortunately most interesting transport layer information fit
1177                         into first 8 bytes of IP data field (minimal nonzero size).
1178                         Thus we don't need actual packet reassembling to build whole
1179                         transport layer data. We only check the fragment offset for
1180                         zero value to find packet with this information.
1181                         */
1182                         if (!off_frag && psize >= 8) {
1183                                 switch (flow->proto) {
1184                                         case IPPROTO_TCP:
1185                                         case IPPROTO_UDP:
1186                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1187                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1188                                                 goto tl_known;
1189
1190 #ifdef ICMP_TRICK
1191                                         case IPPROTO_ICMP:
1192                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1193                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1194                                                 goto tl_known;
1195 #endif
1196 #ifdef ICMP_TRICK_CISCO
1197                                         case IPPROTO_ICMP:
1198                                                 flow->dp = *((int32_t *) tl);
1199                                                 goto tl_known;
1200 #endif
1201
1202                                         default:
1203                                                 /* Unknown transport layer */
1204 #if ((DEBUG) & DEBUG_C)
1205                                                 strcat(logbuf, " U");
1206 #endif
1207                                                 flow->sp = 0;
1208                                                 flow->dp = 0;
1209                                                 break;
1210
1211                                         tl_known:
1212 #if ((DEBUG) & DEBUG_C)
1213                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1214                                                 strcat(logbuf, buf);
1215 #endif
1216                                                 flow->flags |= FLOW_TL;
1217                                 }
1218                         }
1219
1220                         /* Check for tcp flags presence (including CWR and ECE). */
1221                         if (flow->proto == IPPROTO_TCP
1222                                 && off_frag < 16
1223                                 && psize >= 16 - off_frag) {
1224                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1225 #if ((DEBUG) & DEBUG_C)
1226                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1227                                 strcat(logbuf, buf);
1228 #endif
1229                         }
1230
1231 #if ((DEBUG) & DEBUG_C)
1232                         sprintf(buf, " => %x", (unsigned) flow);
1233                         strcat(logbuf, buf);
1234                         my_log(LOG_DEBUG, "%s", logbuf);
1235 #endif
1236
1237 #if ((DEBUG) & DEBUG_I)
1238                         pkts_pending++;
1239                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1240                         if (pending_queue_trace < pending_queue_trace_candidate)
1241                                 pending_queue_trace = pending_queue_trace_candidate;
1242 #endif
1243
1244                         /* Flow complete - inform unpending_thread() about it */
1245                         pending_head->flags |= FLOW_PENDING;
1246                         pending_head = pending_head->next;
1247                 done:
1248                         pthread_cond_signal(&unpending_cond);
1249                 }
1250         }
1251         return 0;
1252 }
1253
1254 /* Copied out of CoDemux */
1255
1256 static int init_daemon() {
1257   pid_t pid;
1258   FILE *pidfile;
1259
1260   pidfile = fopen(PIDFILE, "w");
1261   if (pidfile == NULL) {
1262     my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1263   }
1264
1265   if ((pid = fork()) < 0) {
1266     fclose(pidfile);
1267     my_log(LOG_ERR, "Could not fork!\n");
1268     return(-1);
1269   }
1270   else if (pid != 0) {
1271     /* i'm the parent, writing down the child pid  */
1272     fprintf(pidfile, "%u\n", pid);
1273     fclose(pidfile);
1274     exit(0);
1275   }
1276
1277   /* close the pid file */
1278   fclose(pidfile);
1279
1280   /* routines for any daemon process
1281      1. create a new session 
1282      2. change directory to the root
1283      3. change the file creation permission 
1284   */
1285   setsid();
1286   chdir("/root");
1287   umask(0);
1288
1289   return(0);
1290 }
1291
1292 int main(int argc, char **argv)
1293 {
1294         char errpbuf[512];
1295         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1296         int c, i, write_fd, memory_limit = 0;
1297         struct addrinfo hints, *res;
1298         struct sockaddr_in saddr;
1299         pthread_attr_t tattr;
1300         struct sigaction sigact;
1301         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1302         struct timeval timeout;
1303
1304         sched_min = sched_get_priority_min(SCHED);
1305         sched_max = sched_get_priority_max(SCHED);
1306
1307         memset(&saddr, 0 , sizeof(saddr));
1308         memset(&hints, 0 , sizeof(hints));
1309         hints.ai_flags = AI_PASSIVE;
1310         hints.ai_family = AF_INET;
1311         hints.ai_socktype = SOCK_DGRAM;
1312
1313         /* Process command line options */
1314
1315         opterr = 0;
1316         while ((c = my_getopt(argc, argv, parms)) != -1) {
1317                 switch (c) {
1318                         case '?':
1319                                 usage();
1320
1321                         case 'h':
1322                                 usage();
1323                 }
1324         }
1325
1326         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1327         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1328         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1329         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1330         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1331         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1332         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1333         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1334         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1335         if (parms[nflag].count) {
1336                 switch (atoi(parms[nflag].arg)) {
1337                         case 1:
1338                                 netflow = &NetFlow1;
1339                                 break;
1340
1341                         case 5:
1342                                 break;
1343
1344                         case 7:
1345                                 netflow = &NetFlow7;
1346                                 break;
1347
1348                         default:
1349                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1350                                 exit(1);
1351                 }
1352         }
1353         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1354         if (parms[lflag].count) {
1355                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1356                         *log_suffix++ = 0;
1357                         if (*log_suffix) {
1358                                 sprintf(errpbuf, "[%s]", log_suffix);
1359                                 strcat(ident, errpbuf);
1360                         }
1361                 }
1362                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1363                 if (log_suffix) *--log_suffix = ':';
1364         }
1365         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1366         err_malloc:
1367                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1368                 exit(1);
1369         }
1370         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1371         if (parms[qflag].count) {
1372                 pending_queue_length = atoi(parms[qflag].arg);
1373                 if (pending_queue_length < 1) {
1374                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1375                         exit(1);
1376                 }
1377         }
1378         if (parms[rflag].count) {
1379                 schedp.sched_priority = atoi(parms[rflag].arg);
1380                 if (schedp.sched_priority
1381                         && (schedp.sched_priority < sched_min
1382                                 || schedp.sched_priority > sched_max)) {
1383                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1384                         exit(1);
1385                 }
1386         }
1387         if (parms[Bflag].count) {
1388                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1389         }
1390         if (parms[bflag].count) {
1391                 bulk_quantity = atoi(parms[bflag].arg);
1392                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1393                         fprintf(stderr, "Illegal %s\n", "bulk size");
1394                         exit(1);
1395                 }
1396         }
1397         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1398         if (parms[Xflag].count) {
1399                 for(i = 0; parms[Xflag].arg[i]; i++)
1400                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1401                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1402                         goto err_malloc;
1403                 rule = strtok(parms[Xflag].arg, ":");
1404                 for (i = 0; rule; i++) {
1405                         snmp_rules[i].len = strlen(rule);
1406                         if (snmp_rules[i].len > IFNAMSIZ) {
1407                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1408                                 exit(1);
1409                         }
1410                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1411                         if (!*(rule - 1)) *(rule - 1) = ',';
1412                         rule = strtok(NULL, ",");
1413                         if (!rule) {
1414                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1415                                 exit(1);
1416                         }
1417                         snmp_rules[i].base = atoi(rule);
1418                         *(rule - 1) = ':';
1419                         rule = strtok(NULL, ":");
1420                 }
1421                 nsnmp_rules = i;
1422         }
1423         if (parms[tflag].count)
1424                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1425         if (parms[aflag].count) {
1426                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1427                 bad_lhost:
1428                         fprintf(stderr, "Illegal %s\n", "source address");
1429                         exit(1);
1430                 } else {
1431                         saddr = *((struct sockaddr_in *) res->ai_addr);
1432                         freeaddrinfo(res);
1433                 }
1434         }
1435         if (parms[uflag].count) 
1436                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1437                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1438                         exit(1);
1439                 }
1440
1441
1442         /* Process collectors parameters. Brrrr... :-[ */
1443
1444         npeers = argc - optind;
1445         if (npeers > 1) {
1446                 /* Send to remote Netflow collector */
1447                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1448                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1449                         dhost = argv[i];
1450                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1451                         *dport++ = 0;
1452                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1453                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1454                                 exit(1);
1455                         }
1456                         peers[npeers].write_fd = write_fd;
1457                         peers[npeers].type = PEER_MIRROR;
1458                         peers[npeers].laddr = saddr;
1459                         peers[npeers].seq = 0;
1460                         if ((lhost = strchr(dport, '/'))) {
1461                                 *lhost++ = 0;
1462                                 if ((type = strchr(lhost, '/'))) {
1463                                         *type++ = 0;
1464                                         switch (*type) {
1465                                                 case 0:
1466                                                 case 'm':
1467                                                         break;
1468
1469                                                 case 'r':
1470                                                         peers[npeers].type = PEER_ROTATE;
1471                                                         npeers_rot++;
1472                                                         break;
1473
1474                                                 default:
1475                                                         goto bad_collector;
1476                                         }
1477                                 }
1478                                 if (*lhost) {
1479                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1480                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1481                                         freeaddrinfo(res);
1482                                 }
1483                         }
1484                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1485                                                 sizeof(struct sockaddr_in))) {
1486                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1487                                 exit(1);
1488                         }
1489                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1490 bad_collector:
1491                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1492                                 exit(1);
1493                         }
1494                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1495                         freeaddrinfo(res);
1496                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1497                                                 sizeof(struct sockaddr_in))) {
1498                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1499                                 exit(1);
1500                         }
1501
1502                         /* Restore command line */
1503                         if (type) *--type = '/';
1504                         if (lhost) *--lhost = '/';
1505                         *--dport = ':';
1506                 }
1507         }
1508         else if (parms[fflag].count) {
1509                 // log into a file
1510                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1511                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1512                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1513                 
1514                 peers[npeers].write_fd = START_VALUE;
1515                 peers[npeers].type = PEER_FILE;
1516                 peers[npeers].seq = 0;
1517
1518                 get_cur_epoch();
1519                 npeers++;
1520         }
1521         else 
1522                 usage();
1523
1524
1525         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1526         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1527         if (!ulog_handle) {
1528                 fprintf(stderr, "libipulog initialization error: %s",
1529                         ipulog_strerror(ipulog_errno));
1530                 exit(1);
1531         }
1532         if (sockbufsize)
1533                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1534                         &sockbufsize, sizeof(sockbufsize)) < 0)
1535                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1536
1537         /* Daemonize (if log destination stdout-free) */
1538
1539         my_log_open(ident, verbosity, log_dest);
1540
1541         init_daemon();
1542
1543         if (!(log_dest & 2)) {
1544                 /* Crash-proofing - Sapan*/
1545                 while (1) {
1546                         int pid=fork();
1547                         if (pid==-1) {
1548                                         fprintf(stderr, "fork(): %s", strerror(errno));
1549                                         exit(1);
1550                         }
1551                         else if (pid==0) {
1552                                         setsid();
1553                                         freopen("/dev/null", "r", stdin);
1554                                         freopen("/dev/null", "w", stdout);
1555                                         freopen("/dev/null", "w", stderr);
1556                                         break;
1557                         }
1558                         else {
1559                                 while (wait3(NULL,0,NULL) < 1);
1560                         }
1561                 }
1562         } else {
1563                 setvbuf(stdout, (char *)0, _IONBF, 0);
1564                 setvbuf(stderr, (char *)0, _IONBF, 0);
1565         }
1566
1567         pid = getpid();
1568         sprintf(errpbuf, "[%ld]", (long) pid);
1569         strcat(ident, errpbuf);
1570
1571         /* Initialization */
1572
1573         hash_init(); /* Actually for crc16 only */
1574         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1575         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1576
1577 #ifdef UPTIME_TRICK
1578         /* Hope 12 days is enough :-/ */
1579         start_time_offset = 1 << 20;
1580
1581         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1582 #endif
1583         gettime(&start_time);
1584
1585         /*
1586         Build static pending queue as circular buffer.
1587         */
1588         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1589         pending_tail = pending_head;
1590         for (i = pending_queue_length - 1; i--;) {
1591                 if (!(pending_tail->next = mem_alloc())) {
1592                 err_mem_alloc:
1593                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1594                         exit(1);
1595                 }
1596                 pending_tail = pending_tail->next;
1597         }
1598         pending_tail->next = pending_head;
1599         pending_tail = pending_head;
1600
1601         sigemptyset(&sig_mask);
1602         sigact.sa_handler = &sighandler;
1603         sigact.sa_mask = sig_mask;
1604         sigact.sa_flags = 0;
1605         sigaddset(&sig_mask, SIGTERM);
1606         sigaction(SIGTERM, &sigact, 0);
1607 #if ((DEBUG) & DEBUG_I)
1608         sigaddset(&sig_mask, SIGUSR1);
1609         sigaction(SIGUSR1, &sigact, 0);
1610 #endif
1611         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1612                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1613                 exit(1);
1614         }
1615
1616         my_log(LOG_INFO, "Starting %s...", VERSION);
1617
1618         if (parms[cflag].count) {
1619                 if (chdir(parms[cflag].arg) || chroot(".")) {
1620                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1621                         exit(1);
1622                 }
1623         }
1624
1625         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1626         pthread_attr_init(&tattr);
1627         for (i = 0; i < THREADS - 1; i++) {
1628                 if (schedp.sched_priority > 0) {
1629                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1630                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1631                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1632                                 exit(1);
1633                         }
1634                 }
1635                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1636                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1637                         exit(1);
1638                 }
1639                 pthread_detach(thid);
1640                 schedp.sched_priority++;
1641         }
1642
1643         if (pw) {
1644                 if (setgroups(0, NULL)) {
1645                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1646                         exit(1);
1647                 }
1648                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1649                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1650                         exit(1);
1651                 }
1652                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1653                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1654                         exit(1);
1655                 }
1656         }
1657
1658         if (!(pidfile = fopen(pidfilepath, "w")))
1659                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1660         else {
1661                 fprintf(pidfile, "%ld\n", (long) pid);
1662                 fclose(pidfile);
1663         }
1664
1665         my_log(LOG_INFO, "pid: %d", pid);
1666         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1667                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1668                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1669                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1670                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1671                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1672                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1673         for (i = 0; i < nsnmp_rules; i++) {
1674                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1675                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1676         }
1677         for (i = 0; i < npeers; i++) {
1678                 switch (peers[i].type) {
1679                         case PEER_MIRROR:
1680                                 c = 'm';
1681                                 break;
1682                         case PEER_ROTATE:
1683                                 c = 'r';
1684                                 break;
1685                 }
1686                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1687                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1688                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1689         }
1690
1691         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1692
1693         timeout.tv_usec = 0;
1694         while (!killed
1695                 || (total_elements - free_elements - pending_queue_length)
1696                 || emit_count
1697                 || pending_tail->flags) {
1698
1699                 if (!sigs) {
1700                         timeout.tv_sec = scan_interval;
1701                         select(0, 0, 0, 0, &timeout);
1702                 }
1703
1704                 if (sigs & SIGTERM_MASK && !killed) {
1705                         sigs &= ~SIGTERM_MASK;
1706                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1707                         scan_interval = 1;
1708                         frag_lifetime = -1;
1709                         active_lifetime = -1;
1710                         inactive_lifetime = -1;
1711                         emit_timeout = 1;
1712                         unpending_timeout = 1;
1713                         killed = 1;
1714                         pthread_cond_signal(&scan_cond);
1715                         pthread_cond_signal(&unpending_cond);
1716                 }
1717
1718 #if ((DEBUG) & DEBUG_I)
1719                 if (sigs & SIGUSR1_MASK) {
1720                         sigs &= ~SIGUSR1_MASK;
1721                         info_debug();
1722                 }
1723 #endif
1724         }
1725         remove(pidfilepath);
1726 #if ((DEBUG) & DEBUG_I)
1727         info_debug();
1728 #endif
1729         my_log(LOG_INFO, "Done.");
1730 #ifdef WALL
1731         return 0;
1732 #endif
1733 }