Making records a bit more compact
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9                         Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11         7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
12                         rotation.
13
14         15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
15
16 */
17
18 #include <common.h>
19
20 /* stdout, stderr, freopen() */
21 #include <stdio.h>
22
23 /* atoi(), exit() */
24 #include <stdlib.h>
25
26 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <unistd.h>
28
29 /* strerror() */
30 #include <string.h>
31
32 /* sig*() */
33 #include <signal.h>
34
35 /* statfs() */
36
37 #include <sys/vfs.h>
38
39 #include <libipulog/libipulog.h>
40 struct ipulog_handle {
41         int fd;
42         u_int8_t blocking;
43         struct sockaddr_nl local;
44         struct sockaddr_nl peer;
45         struct nlmsghdr* last_nlhdr;
46 };
47
48 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
49 #include <sys/types.h>
50 #include <netinet/in_systm.h>
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #include <netinet/tcp.h>
56 #include <netinet/udp.h>
57 #include <netinet/ip_icmp.h>
58 #include <net/if.h>
59
60 #include <sys/param.h>
61 #include <pwd.h>
62 #ifdef OS_LINUX
63 #include <grp.h>
64 #endif
65
66 /* pthread_*() */
67 #include <pthread.h>
68
69 /* errno */
70 #include <errno.h>
71
72 /* getaddrinfo() */
73 #include <netdb.h>
74
75 /* nanosleep() */
76 #include <time.h>
77
78 /* gettimeofday() */
79 #include <sys/time.h>
80
81 /* scheduling */
82 #include <sched.h>
83
84 /* select() (POSIX)*/
85 #include <sys/select.h>
86
87 /* open() */
88 #include <sys/stat.h>
89 #include <fcntl.h>
90
91 #include <fprobe-ulog.h>
92 #include <my_log.h>
93 #include <my_getopt.h>
94 #include <netflow.h>
95 #include <hash.h>
96 #include <mem.h>
97
98 #define PIDFILE "/var/log/fprobe-ulog.pid"
99
100 enum {
101         aflag,
102         Bflag,
103         bflag,
104         cflag,
105         dflag,
106         Dflag,
107         eflag,
108         Eflag,
109         fflag,
110         gflag,
111         hflag,
112         lflag,
113         mflag,
114         Mflag,
115         nflag,
116         qflag,
117         rflag,
118         sflag,
119         tflag,
120         Tflag,
121         Uflag,
122         uflag,
123         vflag,
124         Wflag,
125         Xflag,
126 };
127
128 static struct getopt_parms parms[] = {
129         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'h', 0, 0, 0},
140         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'M', 0, 0, 0},
143         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
148         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
149         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
150         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
151         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
152         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
153         {0, 0, 0, 0}
154 };
155
156 extern char *optarg;
157 extern int optind, opterr, optopt;
158 extern int errno;
159
160 extern struct NetFlow NetFlow1;
161 extern struct NetFlow NetFlow5;
162 extern struct NetFlow NetFlow7;
163
164 #define START_VALUE -5
165 #define mark_is_tos parms[Mflag].count
166 static unsigned scan_interval = 5;
167 static int min_free = 0;
168 static int frag_lifetime = 30;
169 static int inactive_lifetime = 60;
170 static int active_lifetime = 300;
171 static int sockbufsize;
172 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
173 #if (MEM_BITS == 0) || (MEM_BITS == 16)
174 #define BULK_QUANTITY 10000
175 #else
176 #define BULK_QUANTITY 200
177 #endif
178
179 static unsigned epoch_length=60, log_epochs=1; 
180 static unsigned cur_epoch=0,prev_uptime=0;
181
182 static unsigned bulk_quantity = BULK_QUANTITY;
183 static unsigned pending_queue_length = 100;
184 static struct NetFlow *netflow = &NetFlow5;
185 static unsigned verbosity = 6;
186 static unsigned log_dest = MY_LOG_SYSLOG;
187 static struct Time start_time;
188 static long start_time_offset;
189 static int off_tl;
190 /* From mem.c */
191 extern unsigned total_elements;
192 extern unsigned free_elements;
193 extern unsigned total_memory;
194 #if ((DEBUG) & DEBUG_I)
195 static unsigned emit_pkts, emit_queue;
196 static uint64_t size_total;
197 static unsigned pkts_total, pkts_total_fragmented;
198 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
199 static unsigned pkts_pending, pkts_pending_done;
200 static unsigned pending_queue_trace, pending_queue_trace_candidate;
201 static unsigned flows_total, flows_fragmented;
202 #endif
203 static unsigned emit_count;
204 static uint32_t emit_sequence;
205 static unsigned emit_rate_bytes, emit_rate_delay;
206 static struct Time emit_time;
207 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
208 static pthread_t thid;
209 static sigset_t sig_mask;
210 static struct sched_param schedp;
211 static int sched_min, sched_max;
212 static int npeers, npeers_rot;
213 static struct peer *peers;
214 static int sigs;
215
216 static struct Flow *flows[1 << HASH_BITS];
217 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
218
219 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
220 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
221
222 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
224 static struct Flow *pending_head, *pending_tail;
225 static struct Flow *scan_frag_dreg;
226
227 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
229 static struct Flow *flows_emit;
230
231 static char ident[256] = "fprobe-ulog";
232 static FILE *pidfile;
233 static char *pidfilepath;
234 static pid_t pid;
235 static int killed;
236 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
237 static struct ipulog_handle *ulog_handle;
238 static uint32_t ulog_gmask = 1;
239 static char *cap_buf;
240 static int nsnmp_rules;
241 static struct snmp_rule *snmp_rules;
242 static struct passwd *pw = 0;
243
244 void usage()
245 {
246         fprintf(stdout,
247                 "fprobe-ulog: a NetFlow probe. Version %s\n"
248                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
249                 "\n"
250                 "-h\t\tDisplay this help\n"
251                 "-U <mask>\tULOG group bitwise mask [1]\n"
252                 "-s <seconds>\tHow often scan for expired flows [5]\n"
253                 "-g <seconds>\tFragmented flow lifetime [30]\n"
254                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
255                 "-f <filename>\tLog flow data in a file\n"
256                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
257                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
258                 "-a <address>\tUse <address> as source for NetFlow flow\n"
259                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
260                 "-M\t\tUse netfilter mark value as ToS flag\n"
261                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
262                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
263                 "-q <flows>\tPending queue length [100]\n"
264                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
265                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
266                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
267                 "-c <directory>\tDirectory to chroot to\n"
268                 "-u <user>\tUser to run as\n"
269                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
270                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
271                 "-y <remote:port>\tAddress of the NetFlow collector\n"
272                 "-f <writable file>\tFile to write data into\n"
273                 "-T <n>\tRotate log file every n epochs\n"
274                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
275                 "-E <[1..60]>\tSize of an epoch in minutes\n"
276                 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
277                 ,
278                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
279         exit(0);
280 }
281
282 #if ((DEBUG) & DEBUG_I)
283 void info_debug()
284 {
285         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
286                 pkts_total, pkts_total_fragmented, size_total,
287                 pkts_pending - pkts_pending_done, pending_queue_trace);
288         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
289                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
290         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
291                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
292         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
293                 total_elements, free_elements, total_memory);
294 }
295 #endif
296
297 void sighandler(int sig)
298 {
299         switch (sig) {
300                 case SIGTERM:
301                         sigs |= SIGTERM_MASK;
302                         break;
303 #if ((DEBUG) & DEBUG_I)
304                 case SIGUSR1:
305                         sigs |= SIGUSR1_MASK;
306                         break;
307 #endif
308         }
309 }
310
311 void gettime(struct Time *now)
312 {
313         struct timeval t;
314
315         gettimeofday(&t, 0);
316         now->sec = t.tv_sec;
317         now->usec = t.tv_usec;
318 }
319
320
321 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
322 {
323         return (t1->sec - t2->sec)/60;
324 }
325
326 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
327 {
328         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
329 }
330
331 /* Uptime in miliseconds */
332 uint32_t getuptime(struct Time *t)
333 {
334         /* Maximum uptime is about 49/2 days */
335         return cmpmtime(t, &start_time);
336 }
337
338 /* Uptime in minutes */
339 uint32_t getuptime_minutes(struct Time *t)
340 {
341         /* Maximum uptime is about 49/2 days */
342         return cmpMtime(t, &start_time);
343 }
344
345 hash_t hash_flow(struct Flow *flow)
346 {
347         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
348         else return hash(flow, sizeof(struct Flow_TL));
349 }
350
351 uint16_t snmp_index(char *name) {
352         uint32_t i;
353
354         if (!*name) return 0;
355
356         for (i = 0; (int) i < nsnmp_rules; i++) {
357                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
358                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
359         }
360
361         if ((i = if_nametoindex(name))) return i;
362
363         return -1;
364 }
365
366 inline void copy_flow(struct Flow *src, struct Flow *dst)
367 {
368         dst->iif = src->iif;
369         dst->oif = src->oif;
370         dst->sip = src->sip;
371         dst->dip = src->dip;
372         dst->tos = src->tos;
373         dst->proto = src->proto;
374         dst->tcp_flags = src->tcp_flags;
375         dst->id = src->id;
376         dst->sp = src->sp;
377         dst->dp = src->dp;
378         dst->pkts = src->pkts;
379         dst->size = src->size;
380         dst->sizeF = src->sizeF;
381         dst->sizeP = src->sizeP;
382         dst->ctime = src->ctime;
383         dst->mtime = src->mtime;
384         dst->flags = src->flags;
385 }
386
387 void get_cur_epoch() {
388         int fd;
389         fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
390         if (fd != -1) {
391                 char snum[7];
392                 ssize_t len;
393                 len = read(fd, snum, sizeof(snum)-1);
394                 if (len != -1) {
395                         snum[len]='\0';
396                         sscanf(snum,"%d",&cur_epoch);
397                         cur_epoch++; /* Let's not stone the last epoch */
398                         close(fd);
399                 }
400         }
401         return;
402 }
403
404
405 void update_cur_epoch_file(int n) {
406         int fd, len;
407         char snum[7];
408         len=snprintf(snum,6,"%d",n);
409         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
410         if (fd == -1) {
411                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
412                 return;
413         }
414         write(fd, snum, len);
415         close(fd);
416 }
417
418 unsigned get_log_fd(char *fname, int cur_fd) {
419         struct Time now;
420         unsigned cur_uptime;
421         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
422          * doesn't solve the problem */
423
424         struct statfs statfs;
425         int ret_fd;
426         gettime(&now);
427         cur_uptime = getuptime_minutes(&now);
428
429
430         if (cur_fd!=START_VALUE) {
431                 if (fstatfs(cur_fd, &statfs) == -1) {
432                         my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
433                 }
434                 else {
435                         if (min_free && (statfs.f_bavail < min_free)) 
436                                 switch(cur_epoch) {
437                                         case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
438                                                 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
439                                                 exit(1);
440                                         default:
441                                                 my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
442                                                 cur_epoch = -1;
443                                 }
444                 }
445         }
446
447         /* Epoch length in minutes */
448         if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
449                 char nextname[MAX_PATH_LEN];
450                 int write_fd;
451                 prev_uptime = cur_uptime;
452                 cur_epoch = (cur_epoch + 1) % log_epochs;
453                 if (cur_fd>0)
454                         close(cur_fd);
455                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
456                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
457                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
458                         exit(1);
459                 }
460                 update_cur_epoch_file(cur_epoch);
461                 ret_fd = write_fd;
462         }
463         else {
464                 ret_fd = cur_fd;
465         }
466         return(ret_fd);
467 }
468
469 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
470 {
471         struct Flow **flowpp;
472
473 #ifdef WALL
474         flowpp = 0;
475 #endif
476
477         if (prev) flowpp = *prev;
478
479         while (where) {
480                 if (where->sip.s_addr == what->sip.s_addr
481                         && where->dip.s_addr == what->dip.s_addr
482                         && where->proto == what->proto) {
483                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
484                                 case 0:
485                                         /* Both unfragmented */
486                                         if ((what->sp == where->sp)
487                                                 && (what->dp == where->dp)) goto done;
488                                         break;
489                                 case 2:
490                                         /* Both fragmented */
491                                         if (where->id == what->id) goto done;
492                                         break;
493                         }
494                 }
495                 flowpp = &where->next;
496                 where = where->next;
497         }
498 done:
499         if (prev) *prev = flowpp;
500         return where;
501 }
502
503 int put_into(struct Flow *flow, int flag
504 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
505         , char *logbuf
506 #endif
507 )
508 {
509         int ret = 0;
510         hash_t h;
511         struct Flow *flown, **flowpp;
512 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
513         char buf[64];
514 #endif
515
516         h = hash_flow(flow);
517 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
518         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
519         strcat(logbuf, buf);
520 #endif
521         pthread_mutex_lock(&flows_mutex[h]);
522         flowpp = &flows[h];
523         if (!(flown = find(flows[h], flow, &flowpp))) {
524                 /* No suitable flow found - add */
525                 if (flag == COPY_INTO) {
526                         if ((flown = mem_alloc())) {
527                                 copy_flow(flow, flown);
528                                 flow = flown;
529                         } else {
530 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
531                                 my_log(LOG_ERR, "%s %s. %s",
532                                         "mem_alloc():", strerror(errno), "packet lost");
533 #endif
534                                 return -1;
535                         }
536                 }
537                 flow->next = flows[h];
538                 flows[h] = flow;
539 #if ((DEBUG) & DEBUG_I)
540                 flows_total++;
541                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
542 #endif
543 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
544                 if (flown) {
545                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
546                         strcat(logbuf, buf);
547                 }
548 #endif
549         } else {
550                 /* Found suitable flow - update */
551 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
552                 sprintf(buf, " +> %x", (unsigned) flown);
553                 strcat(logbuf, buf);
554 #endif
555                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
556                         flown->mtime = flow->mtime;
557                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
558                         flown->ctime = flow->ctime;
559                 flown->tcp_flags |= flow->tcp_flags;
560                 flown->size += flow->size;
561                 flown->pkts += flow->pkts;
562                 if (flow->flags & FLOW_FRAG) {
563                         /* Fragmented flow require some additional work */
564                         if (flow->flags & FLOW_TL) {
565                                 /*
566                                 ?FIXME?
567                                 Several packets with FLOW_TL (attack)
568                                 */
569                                 flown->sp = flow->sp;
570                                 flown->dp = flow->dp;
571                         }
572                         if (flow->flags & FLOW_LASTFRAG) {
573                                 /*
574                                 ?FIXME?
575                                 Several packets with FLOW_LASTFRAG (attack)
576                                 */
577                                 flown->sizeP = flow->sizeP;
578                         }
579                         flown->flags |= flow->flags;
580                         flown->sizeF += flow->sizeF;
581                         if ((flown->flags & FLOW_LASTFRAG)
582                                 && (flown->sizeF >= flown->sizeP)) {
583                                 /* All fragments received - flow reassembled */
584                                 *flowpp = flown->next;
585                                 pthread_mutex_unlock(&flows_mutex[h]);
586 #if ((DEBUG) & DEBUG_I)
587                                 flows_total--;
588                                 flows_fragmented--;
589 #endif
590                                 flown->id = 0;
591                                 flown->flags &= ~FLOW_FRAG;
592 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
593                                 strcat(logbuf," R");
594 #endif
595                                 ret = put_into(flown, MOVE_INTO
596 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
597                                                 , logbuf
598 #endif
599                                         );
600                         }
601                 }
602                 if (flag == MOVE_INTO) mem_free(flow);
603         }
604         pthread_mutex_unlock(&flows_mutex[h]);
605         return ret;
606 }
607
608 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
609 {
610         int i;
611
612         for (i = 0; i < fields; i++) {
613 #if ((DEBUG) & DEBUG_F)
614                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
615 #endif
616                 switch (format[i]) {
617                         case NETFLOW_IPV4_SRC_ADDR:
618                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
619                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
620                                 break;
621
622                         case NETFLOW_IPV4_DST_ADDR:
623                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
624                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
625                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
626                                 }
627                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
628                                 break;
629
630                         case NETFLOW_INPUT_SNMP:
631                                 *((uint16_t *) p) = htons(flow->iif);
632                                 p += NETFLOW_INPUT_SNMP_SIZE;
633                                 break;
634
635                         case NETFLOW_OUTPUT_SNMP:
636                                 *((uint16_t *) p) = htons(flow->oif);
637                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
638                                 break;
639
640                         case NETFLOW_PKTS_32:
641                                 *((uint32_t *) p) = htonl(flow->pkts);
642                                 p += NETFLOW_PKTS_32_SIZE;
643                                 break;
644
645                         case NETFLOW_BYTES_32:
646                                 *((uint32_t *) p) = htonl(flow->size);
647                                 p += NETFLOW_BYTES_32_SIZE;
648                                 break;
649
650                         case NETFLOW_FIRST_SWITCHED:
651                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
652                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
653                                 break;
654
655                         case NETFLOW_LAST_SWITCHED:
656                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
657                                 p += NETFLOW_LAST_SWITCHED_SIZE;
658                                 break;
659
660                         case NETFLOW_L4_SRC_PORT:
661                                 *((uint16_t *) p) = flow->sp;
662                                 p += NETFLOW_L4_SRC_PORT_SIZE;
663                                 break;
664
665                         case NETFLOW_L4_DST_PORT:
666                                 *((uint16_t *) p) = flow->dp;
667                                 p += NETFLOW_L4_DST_PORT_SIZE;
668                                 break;
669
670                         case NETFLOW_PROT:
671                                 *((uint8_t *) p) = flow->proto;
672                                 p += NETFLOW_PROT_SIZE;
673                                 break;
674
675                         case NETFLOW_SRC_TOS:
676                                 *((uint8_t *) p) = flow->tos;
677                                 p += NETFLOW_SRC_TOS_SIZE;
678                                 break;
679
680                         case NETFLOW_TCP_FLAGS:
681                                 *((uint8_t *) p) = flow->tcp_flags;
682                                 p += NETFLOW_TCP_FLAGS_SIZE;
683                                 break;
684
685                         case NETFLOW_VERSION:
686                                 *((uint16_t *) p) = htons(netflow->Version);
687                                 p += NETFLOW_VERSION_SIZE;
688                                 break;
689
690                         case NETFLOW_COUNT:
691                                 *((uint16_t *) p) = htons(emit_count);
692                                 p += NETFLOW_COUNT_SIZE;
693                                 break;
694
695                         case NETFLOW_UPTIME:
696                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
697                                 p += NETFLOW_UPTIME_SIZE;
698                                 break;
699
700                         case NETFLOW_UNIX_SECS:
701                                 *((uint32_t *) p) = htonl(emit_time.sec);
702                                 p += NETFLOW_UNIX_SECS_SIZE;
703                                 break;
704
705                         case NETFLOW_UNIX_NSECS:
706                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
707                                 p += NETFLOW_UNIX_NSECS_SIZE;
708                                 break;
709
710                         case NETFLOW_FLOW_SEQUENCE:
711                                 //*((uint32_t *) p) = htonl(emit_sequence);
712                                 *((uint32_t *) p) = 0;
713                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
714                                 break;
715
716                         case NETFLOW_PAD8:
717                         /* Unsupported (uint8_t) */
718                         case NETFLOW_ENGINE_TYPE:
719                         case NETFLOW_ENGINE_ID:
720                         case NETFLOW_FLAGS7_1:
721                         case NETFLOW_SRC_MASK:
722                         case NETFLOW_DST_MASK:
723                                 *((uint8_t *) p) = 0;
724                                 p += NETFLOW_PAD8_SIZE;
725                                 break;
726                         case NETFLOW_XID:
727                                 *((uint16_t *) p) = flow->tos;
728                                 p += NETFLOW_XID_SIZE;
729                                 break;
730                         case NETFLOW_PAD16:
731                         /* Unsupported (uint16_t) */
732                         case NETFLOW_SRC_AS:
733                         case NETFLOW_DST_AS:
734                         case NETFLOW_FLAGS7_2:
735                                 *((uint16_t *) p) = 0;
736                                 p += NETFLOW_PAD16_SIZE;
737                                 break;
738
739                         case NETFLOW_PAD32:
740                         /* Unsupported (uint32_t) */
741                         case NETFLOW_IPV4_NEXT_HOP:
742                         case NETFLOW_ROUTER_SC:
743                                 *((uint32_t *) p) = 0;
744                                 p += NETFLOW_PAD32_SIZE;
745                                 break;
746
747                         default:
748                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
749                                         format, i, format[i]);
750                                 exit(1);
751                 }
752         }
753 #if ((DEBUG) & DEBUG_F)
754         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
755 #endif
756         return p;
757 }
758
759 void setuser() {
760         /*
761         Workaround for clone()-based threads
762         Try to change EUID independently of main thread
763         */
764         if (pw) {
765                 setgroups(0, NULL);
766                 setregid(pw->pw_gid, pw->pw_gid);
767                 setreuid(pw->pw_uid, pw->pw_uid);
768         }
769 }
770
771 void *emit_thread()
772 {
773         struct Flow *flow;
774         void *p;
775         struct timeval now;
776         struct timespec timeout;
777         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
778
779         p = (void *) &emit_packet + netflow->HeaderSize;
780         timeout.tv_nsec = 0;
781
782         setuser();
783
784         for (;;) {
785                 pthread_mutex_lock(&emit_mutex);
786                 while (!flows_emit) {
787                         gettimeofday(&now, 0);
788                         timeout.tv_sec = now.tv_sec + emit_timeout;
789                         /* Do not wait until emit_packet will filled - it may be too long */
790                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
791                                 pthread_mutex_unlock(&emit_mutex);
792                                 goto sendit;
793                         }
794                 }
795                 flow = flows_emit;
796                 flows_emit = flows_emit->next;
797 #if ((DEBUG) & DEBUG_I)
798                 emit_queue--;
799 #endif          
800                 pthread_mutex_unlock(&emit_mutex);
801
802 #ifdef UPTIME_TRICK
803                 if (!emit_count) {
804                         gettime(&start_time);
805                         start_time.sec -= start_time_offset;
806                 }
807 #endif
808                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
809                 mem_free(flow);
810                 emit_count++;
811 #ifdef PF2_DEBUG
812                 printf("Emit count = %d\n", emit_count);
813                 fflush(stdout);
814 #endif
815                 if (emit_count == netflow->MaxFlows) {
816                 sendit:
817                         gettime(&emit_time);
818                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
819                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
820                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
821 #ifdef STD_NETFLOW_PDU
822                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
823 #endif
824                         peer_rot_cur = 0;
825                         for (i = 0; i < npeers; i++) {
826                                 if (peers[i].type == PEER_FILE) {
827                                                 if (netflow->SeqOffset)
828                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
829                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
830                                                 ret = write(peers[i].write_fd, emit_packet, size);
831                                                 if (ret < size) {
832
833 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
834                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
835                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
836 #endif
837 #undef MESSAGES
838                                                 }
839 #if ((DEBUG) & DEBUG_E)
840                                                 commaneelse {
841                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
842                                                                 emit_count, i + 1, peers[i].seq);
843                                                 }
844 #endif
845                                                 peers[i].seq += emit_count;
846
847                                                 /* Rate limit */
848                                                 if (emit_rate_bytes) {
849                                                         sent += size;
850                                                         delay = sent / emit_rate_bytes;
851                                                         if (delay) {
852                                                                 sent %= emit_rate_bytes;
853                                                                 timeout.tv_sec = 0;
854                                                                 timeout.tv_nsec = emit_rate_delay * delay;
855                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
856                                                         }
857                                                 }
858                                         }
859                                 else
860                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
861                                 else
862                                 if (peers[i].type == PEER_ROTATE) 
863                                         if (peer_rot_cur++ == peer_rot_work) {
864                                         sendreal:
865                                                 if (netflow->SeqOffset)
866                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
867                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
868                                                 if (ret < size) {
869 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
870                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
871                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
872 #endif
873                                                 }
874 #if ((DEBUG) & DEBUG_E)
875                                                 commaneelse {
876                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
877                                                                 emit_count, i + 1, peers[i].seq);
878                                                 }
879 #endif
880                                                 peers[i].seq += emit_count;
881
882                                                 /* Rate limit */
883                                                 if (emit_rate_bytes) {
884                                                         sent += size;
885                                                         delay = sent / emit_rate_bytes;
886                                                         if (delay) {
887                                                                 sent %= emit_rate_bytes;
888                                                                 timeout.tv_sec = 0;
889                                                                 timeout.tv_nsec = emit_rate_delay * delay;
890                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
891                                                         }
892                                                 }
893                                         }
894                         }
895                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
896                         emit_sequence += emit_count;
897                         emit_count = 0;
898 #if ((DEBUG) & DEBUG_I)
899                         emit_pkts++;
900 #endif
901                 }
902         }
903 }       
904
905 void *unpending_thread()
906 {
907         struct timeval now;
908         struct timespec timeout;
909 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
910         char logbuf[256];
911 #endif
912
913         setuser();
914
915         timeout.tv_nsec = 0;
916         pthread_mutex_lock(&unpending_mutex);
917
918         for (;;) {
919                 while (!(pending_tail->flags & FLOW_PENDING)) {
920                         gettimeofday(&now, 0);
921                         timeout.tv_sec = now.tv_sec + unpending_timeout;
922                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
923                 }
924
925 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
926                 *logbuf = 0;
927 #endif
928                 if (put_into(pending_tail, COPY_INTO
929 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
930                                 , logbuf
931 #endif
932                         ) < 0) {
933 #if ((DEBUG) & DEBUG_I)
934                         pkts_lost_unpending++;
935 #endif                          
936                 }
937
938 #if ((DEBUG) & DEBUG_U)
939                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
940 #endif
941
942                 pending_tail->flags = 0;
943                 pending_tail = pending_tail->next;
944 #if ((DEBUG) & DEBUG_I)
945                 pkts_pending_done++;
946 #endif
947         }
948 }
949
950 void *scan_thread()
951 {
952 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
953         char logbuf[256];
954 #endif
955         int i;
956         struct Flow *flow, **flowpp;
957         struct Time now;
958         struct timespec timeout;
959
960         setuser();
961
962         timeout.tv_nsec = 0;
963         pthread_mutex_lock(&scan_mutex);
964
965         for (;;) {
966                 gettime(&now);
967                 timeout.tv_sec = now.sec + scan_interval;
968                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
969
970                 gettime(&now);
971 #if ((DEBUG) & DEBUG_S)
972                 my_log(LOG_DEBUG, "S: %d", now.sec);
973 #endif
974                 for (i = 0; i < 1 << HASH_BITS ; i++) {
975                         pthread_mutex_lock(&flows_mutex[i]);
976                         flow = flows[i];
977                         flowpp = &flows[i];
978                         while (flow) {
979                                 if (flow->flags & FLOW_FRAG) {
980                                         /* Process fragmented flow */
981                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
982                                                 /* Fragmented flow expired - put it into special chain */
983 #if ((DEBUG) & DEBUG_I)
984                                                 flows_fragmented--;
985                                                 flows_total--;
986 #endif
987                                                 *flowpp = flow->next;
988                                                 flow->id = 0;
989                                                 flow->flags &= ~FLOW_FRAG;
990                                                 flow->next = scan_frag_dreg;
991                                                 scan_frag_dreg = flow;
992                                                 flow = *flowpp;
993                                                 continue;
994                                         }
995                                 } else {
996                                         /* Flow is not frgamented */
997                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
998                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
999                                                 /* Flow expired */
1000 #if ((DEBUG) & DEBUG_S)
1001                                                 my_log(LOG_DEBUG, "S: E %x", flow);
1002 #endif
1003 #if ((DEBUG) & DEBUG_I)
1004                                                 flows_total--;
1005 #endif
1006                                                 *flowpp = flow->next;
1007                                                 pthread_mutex_lock(&emit_mutex);
1008                                                 flow->next = flows_emit;
1009                                                 flows_emit = flow;
1010 #if ((DEBUG) & DEBUG_I)
1011                                                 emit_queue++;
1012 #endif                          
1013                                                 pthread_mutex_unlock(&emit_mutex);
1014                                                 flow = *flowpp;
1015                                                 continue;
1016                                         }
1017                                 }
1018                                 flowpp = &flow->next;
1019                                 flow = flow->next;
1020                         } /* chain loop */
1021                         pthread_mutex_unlock(&flows_mutex[i]);
1022                 } /* hash loop */
1023                 if (flows_emit) pthread_cond_signal(&emit_cond);
1024
1025                 while (scan_frag_dreg) {
1026                         flow = scan_frag_dreg;
1027                         scan_frag_dreg = flow->next;
1028 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1029                         *logbuf = 0;
1030 #endif
1031                         put_into(flow, MOVE_INTO
1032 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1033                                 , logbuf
1034 #endif
1035                         );
1036 #if ((DEBUG) & DEBUG_S)
1037                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1038 #endif
1039                 }
1040         }
1041 }
1042
1043 void *cap_thread()
1044 {
1045         struct ulog_packet_msg *ulog_msg;
1046         struct ip *nl;
1047         void *tl;
1048         struct Flow *flow;
1049         int len, off_frag, psize;
1050 #if ((DEBUG) & DEBUG_C)
1051         char buf[64];
1052         char logbuf[256];
1053 #endif
1054
1055         setuser();
1056
1057         while (!killed) {
1058                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1059                 if (len <= 0) {
1060                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1061                         continue;
1062                 }
1063                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1064
1065 #if ((DEBUG) & DEBUG_C)
1066                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1067 #endif
1068
1069                         nl = (void *) &ulog_msg->payload;
1070                         psize = ulog_msg->data_len;
1071
1072                         /* Sanity check */
1073                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1074 #if ((DEBUG) & DEBUG_C)
1075                                 strcat(logbuf, " U");
1076                                 my_log(LOG_DEBUG, "%s", logbuf);
1077 #endif
1078 #if ((DEBUG) & DEBUG_I)
1079                                 pkts_ignored++;
1080 #endif
1081                                 continue;
1082                         }
1083
1084                         if (pending_head->flags) {
1085 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1086                                 my_log(LOG_ERR,
1087 # if ((DEBUG) & DEBUG_C)
1088                                         "%s %s %s", logbuf,
1089 # else
1090                                         "%s %s",
1091 # endif
1092                                         "pending queue full:", "packet lost");
1093 #endif
1094 #if ((DEBUG) & DEBUG_I)
1095                                 pkts_lost_capture++;
1096 #endif
1097                                 goto done;
1098                         }
1099
1100 #if ((DEBUG) & DEBUG_I)
1101                         pkts_total++;
1102 #endif
1103
1104                         flow = pending_head;
1105
1106                         /* ?FIXME? Add sanity check for ip_len? */
1107                         flow->size = ntohs(nl->ip_len);
1108 #if ((DEBUG) & DEBUG_I)
1109                         size_total += flow->size;
1110 #endif
1111
1112                         flow->sip = nl->ip_src;
1113                         flow->dip = nl->ip_dst;
1114                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1115                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1116                         }
1117                         flow->iif = snmp_index(ulog_msg->indev_name);
1118                         flow->oif = snmp_index(ulog_msg->outdev_name);
1119                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1120                         flow->proto = nl->ip_p;
1121                         flow->id = 0;
1122                         flow->tcp_flags = 0;
1123                         flow->pkts = 1;
1124                         flow->sizeF = 0;
1125                         flow->sizeP = 0;
1126                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1127                         if (ulog_msg->timestamp_sec) {
1128                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1129                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1130                         } else gettime(&flow->ctime);
1131                         flow->mtime = flow->ctime;
1132
1133                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1134
1135                         /*
1136                         Offset (from network layer) to transport layer header/IP data
1137                         IOW IP header size ;-)
1138
1139                         ?FIXME?
1140                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1141                         */
1142                         off_tl = nl->ip_hl << 2;
1143                         tl = (void *) nl + off_tl;
1144
1145                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1146                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1147                         psize -= off_tl;
1148                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1149                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1150
1151                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1152                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1153 #if ((DEBUG) & DEBUG_C)
1154                                 strcat(logbuf, " F");
1155 #endif
1156 #if ((DEBUG) & DEBUG_I)
1157                                 pkts_total_fragmented++;
1158 #endif
1159                                 flow->flags |= FLOW_FRAG;
1160                                 flow->id = nl->ip_id;
1161
1162                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1163                                         /* Packet whith IP_MF contains information about whole datagram size */
1164                                         flow->flags |= FLOW_LASTFRAG;
1165                                         /* size = frag_offset*8 + data_size */
1166                                         flow->sizeP = off_frag + flow->sizeF;
1167                                 }
1168                         }
1169
1170 #if ((DEBUG) & DEBUG_C)
1171                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1172                         strcat(logbuf, buf);
1173                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1174                         strcat(logbuf, buf);
1175 #endif
1176
1177                         /*
1178                         Fortunately most interesting transport layer information fit
1179                         into first 8 bytes of IP data field (minimal nonzero size).
1180                         Thus we don't need actual packet reassembling to build whole
1181                         transport layer data. We only check the fragment offset for
1182                         zero value to find packet with this information.
1183                         */
1184                         if (!off_frag && psize >= 8) {
1185                                 switch (flow->proto) {
1186                                         case IPPROTO_TCP:
1187                                         case IPPROTO_UDP:
1188                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1189                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1190                                                 goto tl_known;
1191
1192 #ifdef ICMP_TRICK
1193                                         case IPPROTO_ICMP:
1194                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1195                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1196                                                 goto tl_known;
1197 #endif
1198 #ifdef ICMP_TRICK_CISCO
1199                                         case IPPROTO_ICMP:
1200                                                 flow->dp = *((int32_t *) tl);
1201                                                 goto tl_known;
1202 #endif
1203
1204                                         default:
1205                                                 /* Unknown transport layer */
1206 #if ((DEBUG) & DEBUG_C)
1207                                                 strcat(logbuf, " U");
1208 #endif
1209                                                 flow->sp = 0;
1210                                                 flow->dp = 0;
1211                                                 break;
1212
1213                                         tl_known:
1214 #if ((DEBUG) & DEBUG_C)
1215                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1216                                                 strcat(logbuf, buf);
1217 #endif
1218                                                 flow->flags |= FLOW_TL;
1219                                 }
1220                         }
1221
1222                         /* Check for tcp flags presence (including CWR and ECE). */
1223                         if (flow->proto == IPPROTO_TCP
1224                                 && off_frag < 16
1225                                 && psize >= 16 - off_frag) {
1226                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1227 #if ((DEBUG) & DEBUG_C)
1228                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1229                                 strcat(logbuf, buf);
1230 #endif
1231                         }
1232
1233 #if ((DEBUG) & DEBUG_C)
1234                         sprintf(buf, " => %x", (unsigned) flow);
1235                         strcat(logbuf, buf);
1236                         my_log(LOG_DEBUG, "%s", logbuf);
1237 #endif
1238
1239 #if ((DEBUG) & DEBUG_I)
1240                         pkts_pending++;
1241                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1242                         if (pending_queue_trace < pending_queue_trace_candidate)
1243                                 pending_queue_trace = pending_queue_trace_candidate;
1244 #endif
1245
1246                         /* Flow complete - inform unpending_thread() about it */
1247                         pending_head->flags |= FLOW_PENDING;
1248                         pending_head = pending_head->next;
1249                 done:
1250                         pthread_cond_signal(&unpending_cond);
1251                 }
1252         }
1253         return 0;
1254 }
1255
1256 /* Copied out of CoDemux */
1257
1258 static int init_daemon() {
1259   pid_t pid;
1260   FILE *pidfile;
1261
1262   pidfile = fopen(PIDFILE, "w");
1263   if (pidfile == NULL) {
1264     my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
1265   }
1266
1267   if ((pid = fork()) < 0) {
1268     fclose(pidfile);
1269     my_log(LOG_ERR, "Could not fork!\n");
1270     return(-1);
1271   }
1272   else if (pid != 0) {
1273     /* i'm the parent, writing down the child pid  */
1274     fprintf(pidfile, "%u\n", pid);
1275     fclose(pidfile);
1276     exit(0);
1277   }
1278
1279   /* close the pid file */
1280   fclose(pidfile);
1281
1282   /* routines for any daemon process
1283      1. create a new session 
1284      2. change directory to the root
1285      3. change the file creation permission 
1286   */
1287   setsid();
1288   chdir("/root");
1289   umask(0);
1290
1291   return(0);
1292 }
1293
1294 int main(int argc, char **argv)
1295 {
1296         char errpbuf[512];
1297         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1298         int c, i, write_fd, memory_limit = 0;
1299         struct addrinfo hints, *res;
1300         struct sockaddr_in saddr;
1301         pthread_attr_t tattr;
1302         struct sigaction sigact;
1303         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1304         struct timeval timeout;
1305
1306         sched_min = sched_get_priority_min(SCHED);
1307         sched_max = sched_get_priority_max(SCHED);
1308
1309         memset(&saddr, 0 , sizeof(saddr));
1310         memset(&hints, 0 , sizeof(hints));
1311         hints.ai_flags = AI_PASSIVE;
1312         hints.ai_family = AF_INET;
1313         hints.ai_socktype = SOCK_DGRAM;
1314
1315         /* Process command line options */
1316
1317         opterr = 0;
1318         while ((c = my_getopt(argc, argv, parms)) != -1) {
1319                 switch (c) {
1320                         case '?':
1321                                 usage();
1322
1323                         case 'h':
1324                                 usage();
1325                 }
1326         }
1327
1328         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1329         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1330         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1331         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1332         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1333         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1334         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1335         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1336         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1337         if (parms[nflag].count) {
1338                 switch (atoi(parms[nflag].arg)) {
1339                         case 1:
1340                                 netflow = &NetFlow1;
1341                                 break;
1342
1343                         case 5:
1344                                 break;
1345
1346                         case 7:
1347                                 netflow = &NetFlow7;
1348                                 break;
1349
1350                         default:
1351                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1352                                 exit(1);
1353                 }
1354         }
1355         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1356         if (parms[lflag].count) {
1357                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1358                         *log_suffix++ = 0;
1359                         if (*log_suffix) {
1360                                 sprintf(errpbuf, "[%s]", log_suffix);
1361                                 strcat(ident, errpbuf);
1362                         }
1363                 }
1364                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1365                 if (log_suffix) *--log_suffix = ':';
1366         }
1367         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1368         err_malloc:
1369                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1370                 exit(1);
1371         }
1372         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1373         if (parms[qflag].count) {
1374                 pending_queue_length = atoi(parms[qflag].arg);
1375                 if (pending_queue_length < 1) {
1376                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1377                         exit(1);
1378                 }
1379         }
1380         if (parms[rflag].count) {
1381                 schedp.sched_priority = atoi(parms[rflag].arg);
1382                 if (schedp.sched_priority
1383                         && (schedp.sched_priority < sched_min
1384                                 || schedp.sched_priority > sched_max)) {
1385                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1386                         exit(1);
1387                 }
1388         }
1389         if (parms[Bflag].count) {
1390                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1391         }
1392         if (parms[bflag].count) {
1393                 bulk_quantity = atoi(parms[bflag].arg);
1394                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1395                         fprintf(stderr, "Illegal %s\n", "bulk size");
1396                         exit(1);
1397                 }
1398         }
1399         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1400         if (parms[Xflag].count) {
1401                 for(i = 0; parms[Xflag].arg[i]; i++)
1402                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1403                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1404                         goto err_malloc;
1405                 rule = strtok(parms[Xflag].arg, ":");
1406                 for (i = 0; rule; i++) {
1407                         snmp_rules[i].len = strlen(rule);
1408                         if (snmp_rules[i].len > IFNAMSIZ) {
1409                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1410                                 exit(1);
1411                         }
1412                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1413                         if (!*(rule - 1)) *(rule - 1) = ',';
1414                         rule = strtok(NULL, ",");
1415                         if (!rule) {
1416                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1417                                 exit(1);
1418                         }
1419                         snmp_rules[i].base = atoi(rule);
1420                         *(rule - 1) = ':';
1421                         rule = strtok(NULL, ":");
1422                 }
1423                 nsnmp_rules = i;
1424         }
1425         if (parms[tflag].count)
1426                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1427         if (parms[aflag].count) {
1428                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1429                 bad_lhost:
1430                         fprintf(stderr, "Illegal %s\n", "source address");
1431                         exit(1);
1432                 } else {
1433                         saddr = *((struct sockaddr_in *) res->ai_addr);
1434                         freeaddrinfo(res);
1435                 }
1436         }
1437         if (parms[uflag].count) 
1438                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1439                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1440                         exit(1);
1441                 }
1442
1443
1444         /* Process collectors parameters. Brrrr... :-[ */
1445
1446         npeers = argc - optind;
1447         if (npeers > 1) {
1448                 /* Send to remote Netflow collector */
1449                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1450                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1451                         dhost = argv[i];
1452                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1453                         *dport++ = 0;
1454                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1455                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1456                                 exit(1);
1457                         }
1458                         peers[npeers].write_fd = write_fd;
1459                         peers[npeers].type = PEER_MIRROR;
1460                         peers[npeers].laddr = saddr;
1461                         peers[npeers].seq = 0;
1462                         if ((lhost = strchr(dport, '/'))) {
1463                                 *lhost++ = 0;
1464                                 if ((type = strchr(lhost, '/'))) {
1465                                         *type++ = 0;
1466                                         switch (*type) {
1467                                                 case 0:
1468                                                 case 'm':
1469                                                         break;
1470
1471                                                 case 'r':
1472                                                         peers[npeers].type = PEER_ROTATE;
1473                                                         npeers_rot++;
1474                                                         break;
1475
1476                                                 default:
1477                                                         goto bad_collector;
1478                                         }
1479                                 }
1480                                 if (*lhost) {
1481                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1482                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1483                                         freeaddrinfo(res);
1484                                 }
1485                         }
1486                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1487                                                 sizeof(struct sockaddr_in))) {
1488                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1489                                 exit(1);
1490                         }
1491                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1492 bad_collector:
1493                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1494                                 exit(1);
1495                         }
1496                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1497                         freeaddrinfo(res);
1498                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1499                                                 sizeof(struct sockaddr_in))) {
1500                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1501                                 exit(1);
1502                         }
1503
1504                         /* Restore command line */
1505                         if (type) *--type = '/';
1506                         if (lhost) *--lhost = '/';
1507                         *--dport = ':';
1508                 }
1509         }
1510         else if (parms[fflag].count) {
1511                 // log into a file
1512                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1513                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1514                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1515                 
1516                 peers[npeers].write_fd = START_VALUE;
1517                 peers[npeers].type = PEER_FILE;
1518                 peers[npeers].seq = 0;
1519
1520                 get_cur_epoch();
1521                 npeers++;
1522         }
1523         else 
1524                 usage();
1525
1526
1527         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1528         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1529         if (!ulog_handle) {
1530                 fprintf(stderr, "libipulog initialization error: %s",
1531                         ipulog_strerror(ipulog_errno));
1532                 exit(1);
1533         }
1534         if (sockbufsize)
1535                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1536                         &sockbufsize, sizeof(sockbufsize)) < 0)
1537                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1538
1539         /* Daemonize (if log destination stdout-free) */
1540
1541         my_log_open(ident, verbosity, log_dest);
1542
1543         init_daemon();
1544
1545         if (!(log_dest & 2)) {
1546                 /* Crash-proofing - Sapan*/
1547                 while (1) {
1548                         int pid=fork();
1549                         if (pid==-1) {
1550                                         fprintf(stderr, "fork(): %s", strerror(errno));
1551                                         exit(1);
1552                         }
1553                         else if (pid==0) {
1554                                         setsid();
1555                                         freopen("/dev/null", "r", stdin);
1556                                         freopen("/dev/null", "w", stdout);
1557                                         freopen("/dev/null", "w", stderr);
1558                                         break;
1559                         }
1560                         else {
1561                                 while (wait3(NULL,0,NULL) < 1);
1562                         }
1563                 }
1564         } else {
1565                 setvbuf(stdout, (char *)0, _IONBF, 0);
1566                 setvbuf(stderr, (char *)0, _IONBF, 0);
1567         }
1568
1569         pid = getpid();
1570         sprintf(errpbuf, "[%ld]", (long) pid);
1571         strcat(ident, errpbuf);
1572
1573         /* Initialization */
1574
1575         hash_init(); /* Actually for crc16 only */
1576         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1577         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1578
1579 #ifdef UPTIME_TRICK
1580         /* Hope 12 days is enough :-/ */
1581         start_time_offset = 1 << 20;
1582
1583         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1584 #endif
1585         gettime(&start_time);
1586
1587         /*
1588         Build static pending queue as circular buffer.
1589         */
1590         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1591         pending_tail = pending_head;
1592         for (i = pending_queue_length - 1; i--;) {
1593                 if (!(pending_tail->next = mem_alloc())) {
1594                 err_mem_alloc:
1595                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1596                         exit(1);
1597                 }
1598                 pending_tail = pending_tail->next;
1599         }
1600         pending_tail->next = pending_head;
1601         pending_tail = pending_head;
1602
1603         sigemptyset(&sig_mask);
1604         sigact.sa_handler = &sighandler;
1605         sigact.sa_mask = sig_mask;
1606         sigact.sa_flags = 0;
1607         sigaddset(&sig_mask, SIGTERM);
1608         sigaction(SIGTERM, &sigact, 0);
1609 #if ((DEBUG) & DEBUG_I)
1610         sigaddset(&sig_mask, SIGUSR1);
1611         sigaction(SIGUSR1, &sigact, 0);
1612 #endif
1613         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1614                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1615                 exit(1);
1616         }
1617
1618         my_log(LOG_INFO, "Starting %s...", VERSION);
1619
1620         if (parms[cflag].count) {
1621                 if (chdir(parms[cflag].arg) || chroot(".")) {
1622                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1623                         exit(1);
1624                 }
1625         }
1626
1627         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1628         pthread_attr_init(&tattr);
1629         for (i = 0; i < THREADS - 1; i++) {
1630                 if (schedp.sched_priority > 0) {
1631                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1632                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1633                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1634                                 exit(1);
1635                         }
1636                 }
1637                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1638                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1639                         exit(1);
1640                 }
1641                 pthread_detach(thid);
1642                 schedp.sched_priority++;
1643         }
1644
1645         if (pw) {
1646                 if (setgroups(0, NULL)) {
1647                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1648                         exit(1);
1649                 }
1650                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1651                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1652                         exit(1);
1653                 }
1654                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1655                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1656                         exit(1);
1657                 }
1658         }
1659
1660         if (!(pidfile = fopen(pidfilepath, "w")))
1661                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1662         else {
1663                 fprintf(pidfile, "%ld\n", (long) pid);
1664                 fclose(pidfile);
1665         }
1666
1667         my_log(LOG_INFO, "pid: %d", pid);
1668         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1669                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1670                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1671                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1672                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1673                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1674                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1675         for (i = 0; i < nsnmp_rules; i++) {
1676                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1677                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1678         }
1679         for (i = 0; i < npeers; i++) {
1680                 switch (peers[i].type) {
1681                         case PEER_MIRROR:
1682                                 c = 'm';
1683                                 break;
1684                         case PEER_ROTATE:
1685                                 c = 'r';
1686                                 break;
1687                 }
1688                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1689                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1690                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1691         }
1692
1693         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1694
1695         timeout.tv_usec = 0;
1696         while (!killed
1697                 || (total_elements - free_elements - pending_queue_length)
1698                 || emit_count
1699                 || pending_tail->flags) {
1700
1701                 if (!sigs) {
1702                         timeout.tv_sec = scan_interval;
1703                         select(0, 0, 0, 0, &timeout);
1704                 }
1705
1706                 if (sigs & SIGTERM_MASK && !killed) {
1707                         sigs &= ~SIGTERM_MASK;
1708                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1709                         scan_interval = 1;
1710                         frag_lifetime = -1;
1711                         active_lifetime = -1;
1712                         inactive_lifetime = -1;
1713                         emit_timeout = 1;
1714                         unpending_timeout = 1;
1715                         killed = 1;
1716                         pthread_cond_signal(&scan_cond);
1717                         pthread_cond_signal(&unpending_cond);
1718                 }
1719
1720 #if ((DEBUG) & DEBUG_I)
1721                 if (sigs & SIGUSR1_MASK) {
1722                         sigs &= ~SIGUSR1_MASK;
1723                         info_debug();
1724                 }
1725 #endif
1726         }
1727         remove(pidfilepath);
1728 #if ((DEBUG) & DEBUG_I)
1729         info_debug();
1730 #endif
1731         my_log(LOG_INFO, "Done.");
1732 #ifdef WALL
1733         return 0;
1734 #endif
1735 }