Bug fixes.
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9         7/11/2007       Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11                 Added data collection (-f) functionality, xid support in the header and log file
12                 rotation.
13 */
14
15 #include <common.h>
16
17 /* stdout, stderr, freopen() */
18 #include <stdio.h>
19
20 /* atoi(), exit() */
21 #include <stdlib.h>
22
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
24 #include <unistd.h>
25
26 /* strerror() */
27 #include <string.h>
28
29 /* sig*() */
30 #include <signal.h>
31
32 #include <libipulog/libipulog.h>
33 struct ipulog_handle {
34         int fd;
35         u_int8_t blocking;
36         struct sockaddr_nl local;
37         struct sockaddr_nl peer;
38         struct nlmsghdr* last_nlhdr;
39 };
40
41 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
42 #include <sys/types.h>
43 #include <netinet/in_systm.h>
44 #include <sys/socket.h>
45 #include <netinet/in.h>
46 #include <arpa/inet.h>
47 #include <netinet/ip.h>
48 #include <netinet/tcp.h>
49 #include <netinet/udp.h>
50 #include <netinet/ip_icmp.h>
51 #include <net/if.h>
52
53 #include <sys/param.h>
54 #include <pwd.h>
55 #ifdef OS_LINUX
56 #include <grp.h>
57 #endif
58
59 /* pthread_*() */
60 #include <pthread.h>
61
62 /* errno */
63 #include <errno.h>
64
65 /* getaddrinfo() */
66 #include <netdb.h>
67
68 /* nanosleep() */
69 #include <time.h>
70
71 /* gettimeofday() */
72 #include <sys/time.h>
73
74 /* scheduling */
75 #include <sched.h>
76
77 /* select() (POSIX)*/
78 #include <sys/select.h>
79
80 /* open() */
81 #include <sys/stat.h>
82 #include <fcntl.h>
83
84 #include <fprobe-ulog.h>
85 #include <my_log.h>
86 #include <my_getopt.h>
87 #include <netflow.h>
88 #include <hash.h>
89 #include <mem.h>
90
91 enum {
92         aflag,
93         Bflag,
94         bflag,
95         cflag,
96         dflag,
97         eflag,
98         Eflag,
99         fflag,
100         gflag,
101         hflag,
102         lflag,
103         mflag,
104         Mflag,
105         nflag,
106         qflag,
107         rflag,
108         sflag,
109         tflag,
110         Tflag,
111         Uflag,
112         uflag,
113         vflag,
114         Xflag,
115 };
116
117 static struct getopt_parms parms[] = {
118         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
119         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
120         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
121         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
122         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
123         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
124         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'h', 0, 0, 0},
128         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'M', 0, 0, 0},
131         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {0, 0, 0, 0}
142 };
143
144 extern char *optarg;
145 extern int optind, opterr, optopt;
146 extern int errno;
147
148 extern struct NetFlow NetFlow1;
149 extern struct NetFlow NetFlow5;
150 extern struct NetFlow NetFlow7;
151
152 #define mark_is_tos parms[Mflag].count
153 static unsigned scan_interval = 5;
154 static int frag_lifetime = 30;
155 static int inactive_lifetime = 60;
156 static int active_lifetime = 300;
157 static int sockbufsize;
158 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
159 #if (MEM_BITS == 0) || (MEM_BITS == 16)
160 #define BULK_QUANTITY 10000
161 #else
162 #define BULK_QUANTITY 200
163 #endif
164
165 static unsigned epoch_length=60, log_epochs=1; 
166 static unsigned cur_epoch=0,prev_uptime=0;
167
168 static unsigned bulk_quantity = BULK_QUANTITY;
169 static unsigned pending_queue_length = 100;
170 static struct NetFlow *netflow = &NetFlow5;
171 static unsigned verbosity = 6;
172 static unsigned log_dest = MY_LOG_SYSLOG;
173 static struct Time start_time;
174 static long start_time_offset;
175 static int off_tl;
176 /* From mem.c */
177 extern unsigned total_elements;
178 extern unsigned free_elements;
179 extern unsigned total_memory;
180 #if ((DEBUG) & DEBUG_I)
181 static unsigned emit_pkts, emit_queue;
182 static uint64_t size_total;
183 static unsigned pkts_total, pkts_total_fragmented;
184 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
185 static unsigned pkts_pending, pkts_pending_done;
186 static unsigned pending_queue_trace, pending_queue_trace_candidate;
187 static unsigned flows_total, flows_fragmented;
188 #endif
189 static unsigned emit_count;
190 static uint32_t emit_sequence;
191 static unsigned emit_rate_bytes, emit_rate_delay;
192 static struct Time emit_time;
193 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
194 static pthread_t thid;
195 static sigset_t sig_mask;
196 static struct sched_param schedp;
197 static int sched_min, sched_max;
198 static int npeers, npeers_rot;
199 static struct peer *peers;
200 static int sigs;
201
202 static struct Flow *flows[1 << HASH_BITS];
203 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
204
205 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
206 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
207
208 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
209 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
210 static struct Flow *pending_head, *pending_tail;
211 static struct Flow *scan_frag_dreg;
212
213 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
215 static struct Flow *flows_emit;
216
217 static char ident[256] = "fprobe-ulog";
218 static FILE *pidfile;
219 static char *pidfilepath;
220 static pid_t pid;
221 static int killed;
222 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
223 static struct ipulog_handle *ulog_handle;
224 static uint32_t ulog_gmask = 1;
225 static char *cap_buf;
226 static int nsnmp_rules;
227 static struct snmp_rule *snmp_rules;
228 static struct passwd *pw = 0;
229
230 void usage()
231 {
232         fprintf(stdout,
233                 "fprobe-ulog: a NetFlow probe. Version %s\n"
234                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
235                 "\n"
236                 "-h\t\tDisplay this help\n"
237                 "-U <mask>\tULOG group bitwise mask [1]\n"
238                 "-s <seconds>\tHow often scan for expired flows [5]\n"
239                 "-g <seconds>\tFragmented flow lifetime [30]\n"
240                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
241                 "-f <filename>\tLog flow data in a file\n"
242                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
243                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
244                 "-a <address>\tUse <address> as source for NetFlow flow\n"
245                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
246                 "-M\t\tUse netfilter mark value as ToS flag\n"
247                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
248                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
249                 "-q <flows>\tPending queue length [100]\n"
250                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
251                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
252                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
253                 "-c <directory>\tDirectory to chroot to\n"
254                 "-u <user>\tUser to run as\n"
255                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
256                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
257                 "-y <remote:port>\tAddress of the NetFlow collector\n"
258                 "-f <writable file>\tFile to write data into\n"
259                 "-T <n>\tRotate log file every n epochs\n"
260                 "-E <[1..60]>\tSize of an epoch in minutes\n",
261                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
262         exit(0);
263 }
264
265 #if ((DEBUG) & DEBUG_I)
266 void info_debug()
267 {
268         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
269                 pkts_total, pkts_total_fragmented, size_total,
270                 pkts_pending - pkts_pending_done, pending_queue_trace);
271         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
272                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
273         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
274                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
275         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
276                 total_elements, free_elements, total_memory);
277 }
278 #endif
279
280 void sighandler(int sig)
281 {
282         switch (sig) {
283                 case SIGTERM:
284                         sigs |= SIGTERM_MASK;
285                         break;
286 #if ((DEBUG) & DEBUG_I)
287                 case SIGUSR1:
288                         sigs |= SIGUSR1_MASK;
289                         break;
290 #endif
291         }
292 }
293
294 void gettime(struct Time *now)
295 {
296         struct timeval t;
297
298         gettimeofday(&t, 0);
299         now->sec = t.tv_sec;
300         now->usec = t.tv_usec;
301 }
302
303
304 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
305 {
306         return (t1->sec - t2->sec)/60;
307 }
308
309 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
310 {
311         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
312 }
313
314 /* Uptime in miliseconds */
315 uint32_t getuptime(struct Time *t)
316 {
317         /* Maximum uptime is about 49/2 days */
318         return cmpmtime(t, &start_time);
319 }
320
321 /* Uptime in minutes */
322 uint32_t getuptime_minutes(struct Time *t)
323 {
324         /* Maximum uptime is about 49/2 days */
325         return cmpMtime(t, &start_time);
326 }
327
328 hash_t hash_flow(struct Flow *flow)
329 {
330         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
331         else return hash(flow, sizeof(struct Flow_TL));
332 }
333
334 uint16_t snmp_index(char *name) {
335         uint32_t i;
336
337         if (!*name) return 0;
338
339         for (i = 0; (int) i < nsnmp_rules; i++) {
340                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
341                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
342         }
343
344         if ((i = if_nametoindex(name))) return i;
345
346         return -1;
347 }
348
349 inline void copy_flow(struct Flow *src, struct Flow *dst)
350 {
351         dst->iif = src->iif;
352         dst->oif = src->oif;
353         dst->sip = src->sip;
354         dst->dip = src->dip;
355         dst->tos = src->tos;
356         dst->proto = src->proto;
357         dst->tcp_flags = src->tcp_flags;
358         dst->id = src->id;
359         dst->sp = src->sp;
360         dst->dp = src->dp;
361         dst->pkts = src->pkts;
362         dst->size = src->size;
363         dst->sizeF = src->sizeF;
364         dst->sizeP = src->sizeP;
365         dst->ctime = src->ctime;
366         dst->mtime = src->mtime;
367         dst->flags = src->flags;
368 }
369
370 unsigned get_log_fd(char *fname, unsigned cur_fd) {
371         struct Time now;
372         unsigned cur_uptime;
373         int ret_fd;
374         gettime(&now);
375         cur_uptime = getuptime_minutes(&now);
376
377         /* Epoch length in minutes */
378         if ((cur_uptime - prev_uptime) > epoch_length || cur_fd==-1) {
379                 char nextname[MAX_PATH_LEN];
380                 int write_fd;
381                 prev_uptime = cur_uptime;
382                 cur_epoch = (cur_epoch + 1) % log_epochs;
383                 close(cur_fd);
384                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
385                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
386                         fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
387                         exit(1);
388                 }
389                 ret_fd = write_fd;
390         }
391         else
392                 ret_fd = cur_fd;
393         return(ret_fd);
394 }
395
396 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
397 {
398         struct Flow **flowpp;
399
400 #ifdef WALL
401         flowpp = 0;
402 #endif
403
404         if (prev) flowpp = *prev;
405
406         while (where) {
407                 if (where->sip.s_addr == what->sip.s_addr
408                         && where->dip.s_addr == what->dip.s_addr
409                         && where->proto == what->proto) {
410                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
411                                 case 0:
412                                         /* Both unfragmented */
413                                         if ((what->sp == where->sp)
414                                                 && (what->dp == where->dp)) goto done;
415                                         break;
416                                 case 2:
417                                         /* Both fragmented */
418                                         if (where->id == what->id) goto done;
419                                         break;
420                         }
421                 }
422                 flowpp = &where->next;
423                 where = where->next;
424         }
425 done:
426         if (prev) *prev = flowpp;
427         return where;
428 }
429
430 int put_into(struct Flow *flow, int flag
431 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
432         , char *logbuf
433 #endif
434 )
435 {
436         int ret = 0;
437         hash_t h;
438         struct Flow *flown, **flowpp;
439 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
440         char buf[64];
441 #endif
442
443         h = hash_flow(flow);
444 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
445         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
446         strcat(logbuf, buf);
447 #endif
448         pthread_mutex_lock(&flows_mutex[h]);
449         flowpp = &flows[h];
450         if (!(flown = find(flows[h], flow, &flowpp))) {
451                 /* No suitable flow found - add */
452                 if (flag == COPY_INTO) {
453                         if ((flown = mem_alloc())) {
454                                 copy_flow(flow, flown);
455                                 flow = flown;
456                         } else {
457 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
458                                 my_log(LOG_ERR, "%s %s. %s",
459                                         "mem_alloc():", strerror(errno), "packet lost");
460 #endif
461                                 return -1;
462                         }
463                 }
464                 flow->next = flows[h];
465                 flows[h] = flow;
466 #if ((DEBUG) & DEBUG_I)
467                 flows_total++;
468                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
469 #endif
470 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
471                 if (flown) {
472                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
473                         strcat(logbuf, buf);
474                 }
475 #endif
476         } else {
477                 /* Found suitable flow - update */
478 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
479                 sprintf(buf, " +> %x", (unsigned) flown);
480                 strcat(logbuf, buf);
481 #endif
482                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
483                         flown->mtime = flow->mtime;
484                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
485                         flown->ctime = flow->ctime;
486                 flown->tcp_flags |= flow->tcp_flags;
487                 flown->size += flow->size;
488                 flown->pkts += flow->pkts;
489                 if (flow->flags & FLOW_FRAG) {
490                         /* Fragmented flow require some additional work */
491                         if (flow->flags & FLOW_TL) {
492                                 /*
493                                 ?FIXME?
494                                 Several packets with FLOW_TL (attack)
495                                 */
496                                 flown->sp = flow->sp;
497                                 flown->dp = flow->dp;
498                         }
499                         if (flow->flags & FLOW_LASTFRAG) {
500                                 /*
501                                 ?FIXME?
502                                 Several packets with FLOW_LASTFRAG (attack)
503                                 */
504                                 flown->sizeP = flow->sizeP;
505                         }
506                         flown->flags |= flow->flags;
507                         flown->sizeF += flow->sizeF;
508                         if ((flown->flags & FLOW_LASTFRAG)
509                                 && (flown->sizeF >= flown->sizeP)) {
510                                 /* All fragments received - flow reassembled */
511                                 *flowpp = flown->next;
512                                 pthread_mutex_unlock(&flows_mutex[h]);
513 #if ((DEBUG) & DEBUG_I)
514                                 flows_total--;
515                                 flows_fragmented--;
516 #endif
517                                 flown->id = 0;
518                                 flown->flags &= ~FLOW_FRAG;
519 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
520                                 strcat(logbuf," R");
521 #endif
522                                 ret = put_into(flown, MOVE_INTO
523 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
524                                                 , logbuf
525 #endif
526                                         );
527                         }
528                 }
529                 if (flag == MOVE_INTO) mem_free(flow);
530         }
531         pthread_mutex_unlock(&flows_mutex[h]);
532         return ret;
533 }
534
535 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
536 {
537         int i;
538
539         for (i = 0; i < fields; i++) {
540 #if ((DEBUG) & DEBUG_F)
541                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
542 #endif
543                 switch (format[i]) {
544                         case NETFLOW_IPV4_SRC_ADDR:
545                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
546                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
547                                 break;
548
549                         case NETFLOW_IPV4_DST_ADDR:
550                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
551                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
552                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
553                                 }
554                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
555                                 break;
556
557                         case NETFLOW_INPUT_SNMP:
558                                 *((uint16_t *) p) = htons(flow->iif);
559                                 p += NETFLOW_INPUT_SNMP_SIZE;
560                                 break;
561
562                         case NETFLOW_OUTPUT_SNMP:
563                                 *((uint16_t *) p) = htons(flow->oif);
564                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
565                                 break;
566
567                         case NETFLOW_PKTS_32:
568                                 *((uint32_t *) p) = htonl(flow->pkts);
569                                 p += NETFLOW_PKTS_32_SIZE;
570                                 break;
571
572                         case NETFLOW_BYTES_32:
573                                 *((uint32_t *) p) = htonl(flow->size);
574                                 p += NETFLOW_BYTES_32_SIZE;
575                                 break;
576
577                         case NETFLOW_FIRST_SWITCHED:
578                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
579                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
580                                 break;
581
582                         case NETFLOW_LAST_SWITCHED:
583                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
584                                 p += NETFLOW_LAST_SWITCHED_SIZE;
585                                 break;
586
587                         case NETFLOW_L4_SRC_PORT:
588                                 *((uint16_t *) p) = flow->sp;
589                                 p += NETFLOW_L4_SRC_PORT_SIZE;
590                                 break;
591
592                         case NETFLOW_L4_DST_PORT:
593                                 *((uint16_t *) p) = flow->dp;
594                                 p += NETFLOW_L4_DST_PORT_SIZE;
595                                 break;
596
597                         case NETFLOW_PROT:
598                                 *((uint8_t *) p) = flow->proto;
599                                 p += NETFLOW_PROT_SIZE;
600                                 break;
601
602                         case NETFLOW_SRC_TOS:
603                                 *((uint8_t *) p) = flow->tos;
604                                 p += NETFLOW_SRC_TOS_SIZE;
605                                 break;
606
607                         case NETFLOW_TCP_FLAGS:
608                                 *((uint8_t *) p) = flow->tcp_flags;
609                                 p += NETFLOW_TCP_FLAGS_SIZE;
610                                 break;
611
612                         case NETFLOW_VERSION:
613                                 *((uint16_t *) p) = htons(netflow->Version);
614                                 p += NETFLOW_VERSION_SIZE;
615                                 break;
616
617                         case NETFLOW_COUNT:
618                                 *((uint16_t *) p) = htons(emit_count);
619                                 p += NETFLOW_COUNT_SIZE;
620                                 break;
621
622                         case NETFLOW_UPTIME:
623                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
624                                 p += NETFLOW_UPTIME_SIZE;
625                                 break;
626
627                         case NETFLOW_UNIX_SECS:
628                                 *((uint32_t *) p) = htonl(emit_time.sec);
629                                 p += NETFLOW_UNIX_SECS_SIZE;
630                                 break;
631
632                         case NETFLOW_UNIX_NSECS:
633                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
634                                 p += NETFLOW_UNIX_NSECS_SIZE;
635                                 break;
636
637                         case NETFLOW_FLOW_SEQUENCE:
638                                 //*((uint32_t *) p) = htonl(emit_sequence);
639                                 *((uint32_t *) p) = 0;
640                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
641                                 break;
642
643                         case NETFLOW_PAD8:
644                         /* Unsupported (uint8_t) */
645                         case NETFLOW_ENGINE_TYPE:
646                         case NETFLOW_ENGINE_ID:
647                         case NETFLOW_FLAGS7_1:
648                         case NETFLOW_SRC_MASK:
649                         case NETFLOW_DST_MASK:
650                                 *((uint8_t *) p) = 0;
651                                 p += NETFLOW_PAD8_SIZE;
652                                 break;
653                         case NETFLOW_XID:
654                                 *((uint16_t *) p) = flow->tos;
655                                 p += NETFLOW_XID_SIZE;
656                                 break;
657                         case NETFLOW_PAD16:
658                         /* Unsupported (uint16_t) */
659                         case NETFLOW_SRC_AS:
660                         case NETFLOW_DST_AS:
661                         case NETFLOW_FLAGS7_2:
662                                 *((uint16_t *) p) = 0;
663                                 p += NETFLOW_PAD16_SIZE;
664                                 break;
665
666                         case NETFLOW_PAD32:
667                         /* Unsupported (uint32_t) */
668                         case NETFLOW_IPV4_NEXT_HOP:
669                         case NETFLOW_ROUTER_SC:
670                                 *((uint32_t *) p) = 0;
671                                 p += NETFLOW_PAD32_SIZE;
672                                 break;
673
674                         default:
675                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
676                                         format, i, format[i]);
677                                 exit(1);
678                 }
679         }
680 #if ((DEBUG) & DEBUG_F)
681         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
682 #endif
683         return p;
684 }
685
686 void setuser() {
687         /*
688         Workaround for clone()-based threads
689         Try to change EUID independently of main thread
690         */
691         if (pw) {
692                 setgroups(0, NULL);
693                 setregid(pw->pw_gid, pw->pw_gid);
694                 setreuid(pw->pw_uid, pw->pw_uid);
695         }
696 }
697
698 void *emit_thread()
699 {
700         struct Flow *flow;
701         void *p;
702         struct timeval now;
703         struct timespec timeout;
704         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
705
706         p = (void *) &emit_packet + netflow->HeaderSize;
707         timeout.tv_nsec = 0;
708
709         setuser();
710
711         for (;;) {
712                 pthread_mutex_lock(&emit_mutex);
713                 while (!flows_emit) {
714                         gettimeofday(&now, 0);
715                         timeout.tv_sec = now.tv_sec + emit_timeout;
716                         /* Do not wait until emit_packet will filled - it may be too long */
717                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
718                                 pthread_mutex_unlock(&emit_mutex);
719                                 goto sendit;
720                         }
721                 }
722                 flow = flows_emit;
723                 flows_emit = flows_emit->next;
724 #if ((DEBUG) & DEBUG_I)
725                 emit_queue--;
726 #endif          
727                 pthread_mutex_unlock(&emit_mutex);
728
729 #ifdef UPTIME_TRICK
730                 if (!emit_count) {
731                         gettime(&start_time);
732                         start_time.sec -= start_time_offset;
733                 }
734 #endif
735                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
736                 mem_free(flow);
737                 emit_count++;
738 #ifdef PF2_DEBUG
739                 printf("Emit count = %d\n", emit_count);
740                 fflush(stdout);
741 #endif
742                 if (emit_count == netflow->MaxFlows) {
743                 sendit:
744                         gettime(&emit_time);
745                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
746                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
747                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
748                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
749                         peer_rot_cur = 0;
750                         for (i = 0; i < npeers; i++) {
751                                 if (peers[i].type == PEER_FILE) {
752                                                 if (netflow->SeqOffset)
753                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
754                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
755                                                 ret = write(peers[0].write_fd, emit_packet, size);
756                                                 if (ret < size) {
757 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
758                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
759                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
760 #endif
761 #undef MESSAGES
762                                                 }
763 #if ((DEBUG) & DEBUG_E)
764                                                 commaneelse {
765                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
766                                                                 emit_count, i + 1, peers[i].seq);
767                                                 }
768 #endif
769                                                 peers[0].seq += emit_count;
770
771                                                 /* Rate limit */
772                                                 if (emit_rate_bytes) {
773                                                         sent += size;
774                                                         delay = sent / emit_rate_bytes;
775                                                         if (delay) {
776                                                                 sent %= emit_rate_bytes;
777                                                                 timeout.tv_sec = 0;
778                                                                 timeout.tv_nsec = emit_rate_delay * delay;
779                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
780                                                         }
781                                                 }
782                                         }
783                                 else
784                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
785                                 else
786                                 if (peers[i].type == PEER_ROTATE) 
787                                         if (peer_rot_cur++ == peer_rot_work) {
788                                         sendreal:
789                                                 if (netflow->SeqOffset)
790                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
791                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
792                                                 if (ret < size) {
793 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
794                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
795                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
796 #endif
797                                                 }
798 #if ((DEBUG) & DEBUG_E)
799                                                 commaneelse {
800                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
801                                                                 emit_count, i + 1, peers[i].seq);
802                                                 }
803 #endif
804                                                 peers[i].seq += emit_count;
805
806                                                 /* Rate limit */
807                                                 if (emit_rate_bytes) {
808                                                         sent += size;
809                                                         delay = sent / emit_rate_bytes;
810                                                         if (delay) {
811                                                                 sent %= emit_rate_bytes;
812                                                                 timeout.tv_sec = 0;
813                                                                 timeout.tv_nsec = emit_rate_delay * delay;
814                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
815                                                         }
816                                                 }
817                                         }
818                         }
819                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
820                         emit_sequence += emit_count;
821                         emit_count = 0;
822 #if ((DEBUG) & DEBUG_I)
823                         emit_pkts++;
824 #endif
825                 }
826         }
827 }       
828
829 void *unpending_thread()
830 {
831         struct timeval now;
832         struct timespec timeout;
833 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
834         char logbuf[256];
835 #endif
836
837         setuser();
838
839         timeout.tv_nsec = 0;
840         pthread_mutex_lock(&unpending_mutex);
841
842         for (;;) {
843                 while (!(pending_tail->flags & FLOW_PENDING)) {
844                         gettimeofday(&now, 0);
845                         timeout.tv_sec = now.tv_sec + unpending_timeout;
846                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
847                 }
848
849 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
850                 *logbuf = 0;
851 #endif
852                 if (put_into(pending_tail, COPY_INTO
853 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
854                                 , logbuf
855 #endif
856                         ) < 0) {
857 #if ((DEBUG) & DEBUG_I)
858                         pkts_lost_unpending++;
859 #endif                          
860                 }
861
862 #if ((DEBUG) & DEBUG_U)
863                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
864 #endif
865
866                 pending_tail->flags = 0;
867                 pending_tail = pending_tail->next;
868 #if ((DEBUG) & DEBUG_I)
869                 pkts_pending_done++;
870 #endif
871         }
872 }
873
874 void *scan_thread()
875 {
876 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
877         char logbuf[256];
878 #endif
879         int i;
880         struct Flow *flow, **flowpp;
881         struct Time now;
882         struct timespec timeout;
883
884         setuser();
885
886         timeout.tv_nsec = 0;
887         pthread_mutex_lock(&scan_mutex);
888
889         for (;;) {
890                 gettime(&now);
891                 timeout.tv_sec = now.sec + scan_interval;
892                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
893
894                 gettime(&now);
895 #if ((DEBUG) & DEBUG_S)
896                 my_log(LOG_DEBUG, "S: %d", now.sec);
897 #endif
898                 for (i = 0; i < 1 << HASH_BITS ; i++) {
899                         pthread_mutex_lock(&flows_mutex[i]);
900                         flow = flows[i];
901                         flowpp = &flows[i];
902                         while (flow) {
903                                 if (flow->flags & FLOW_FRAG) {
904                                         /* Process fragmented flow */
905                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
906                                                 /* Fragmented flow expired - put it into special chain */
907 #if ((DEBUG) & DEBUG_I)
908                                                 flows_fragmented--;
909                                                 flows_total--;
910 #endif
911                                                 *flowpp = flow->next;
912                                                 flow->id = 0;
913                                                 flow->flags &= ~FLOW_FRAG;
914                                                 flow->next = scan_frag_dreg;
915                                                 scan_frag_dreg = flow;
916                                                 flow = *flowpp;
917                                                 continue;
918                                         }
919                                 } else {
920                                         /* Flow is not frgamented */
921                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
922                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
923                                                 /* Flow expired */
924 #if ((DEBUG) & DEBUG_S)
925                                                 my_log(LOG_DEBUG, "S: E %x", flow);
926 #endif
927 #if ((DEBUG) & DEBUG_I)
928                                                 flows_total--;
929 #endif
930                                                 *flowpp = flow->next;
931                                                 pthread_mutex_lock(&emit_mutex);
932                                                 flow->next = flows_emit;
933                                                 flows_emit = flow;
934 #if ((DEBUG) & DEBUG_I)
935                                                 emit_queue++;
936 #endif                          
937                                                 pthread_mutex_unlock(&emit_mutex);
938                                                 flow = *flowpp;
939                                                 continue;
940                                         }
941                                 }
942                                 flowpp = &flow->next;
943                                 flow = flow->next;
944                         } /* chain loop */
945                         pthread_mutex_unlock(&flows_mutex[i]);
946                 } /* hash loop */
947                 if (flows_emit) pthread_cond_signal(&emit_cond);
948
949                 while (scan_frag_dreg) {
950                         flow = scan_frag_dreg;
951                         scan_frag_dreg = flow->next;
952 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
953                         *logbuf = 0;
954 #endif
955                         put_into(flow, MOVE_INTO
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
957                                 , logbuf
958 #endif
959                         );
960 #if ((DEBUG) & DEBUG_S)
961                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
962 #endif
963                 }
964         }
965 }
966
967 void *cap_thread()
968 {
969         struct ulog_packet_msg *ulog_msg;
970         struct ip *nl;
971         void *tl;
972         struct Flow *flow;
973         int len, off_frag, psize;
974 #if ((DEBUG) & DEBUG_C)
975         char buf[64];
976         char logbuf[256];
977 #endif
978
979         setuser();
980
981         while (!killed) {
982                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
983                 if (len <= 0) {
984                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
985                         continue;
986                 }
987                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
988
989 #if ((DEBUG) & DEBUG_C)
990                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
991 #endif
992
993                         nl = (void *) &ulog_msg->payload;
994                         psize = ulog_msg->data_len;
995
996                         /* Sanity check */
997                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
998 #if ((DEBUG) & DEBUG_C)
999                                 strcat(logbuf, " U");
1000                                 my_log(LOG_DEBUG, "%s", logbuf);
1001 #endif
1002 #if ((DEBUG) & DEBUG_I)
1003                                 pkts_ignored++;
1004 #endif
1005                                 continue;
1006                         }
1007
1008                         if (pending_head->flags) {
1009 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1010                                 my_log(LOG_ERR,
1011 # if ((DEBUG) & DEBUG_C)
1012                                         "%s %s %s", logbuf,
1013 # else
1014                                         "%s %s",
1015 # endif
1016                                         "pending queue full:", "packet lost");
1017 #endif
1018 #if ((DEBUG) & DEBUG_I)
1019                                 pkts_lost_capture++;
1020 #endif
1021                                 goto done;
1022                         }
1023
1024 #if ((DEBUG) & DEBUG_I)
1025                         pkts_total++;
1026 #endif
1027
1028                         flow = pending_head;
1029
1030                         /* ?FIXME? Add sanity check for ip_len? */
1031                         flow->size = ntohs(nl->ip_len);
1032 #if ((DEBUG) & DEBUG_I)
1033                         size_total += flow->size;
1034 #endif
1035
1036                         flow->sip = nl->ip_src;
1037                         flow->dip = nl->ip_dst;
1038                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1039                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1040                         }
1041                         flow->iif = snmp_index(ulog_msg->indev_name);
1042                         flow->oif = snmp_index(ulog_msg->outdev_name);
1043                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1044                         flow->proto = nl->ip_p;
1045                         flow->id = 0;
1046                         flow->tcp_flags = 0;
1047                         flow->pkts = 1;
1048                         flow->sizeF = 0;
1049                         flow->sizeP = 0;
1050                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1051                         if (ulog_msg->timestamp_sec) {
1052                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1053                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1054                         } else gettime(&flow->ctime);
1055                         flow->mtime = flow->ctime;
1056
1057                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1058
1059                         /*
1060                         Offset (from network layer) to transport layer header/IP data
1061                         IOW IP header size ;-)
1062
1063                         ?FIXME?
1064                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1065                         */
1066                         off_tl = nl->ip_hl << 2;
1067                         tl = (void *) nl + off_tl;
1068
1069                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1070                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1071                         psize -= off_tl;
1072                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1073                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1074
1075                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1076                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1077 #if ((DEBUG) & DEBUG_C)
1078                                 strcat(logbuf, " F");
1079 #endif
1080 #if ((DEBUG) & DEBUG_I)
1081                                 pkts_total_fragmented++;
1082 #endif
1083                                 flow->flags |= FLOW_FRAG;
1084                                 flow->id = nl->ip_id;
1085
1086                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1087                                         /* Packet whith IP_MF contains information about whole datagram size */
1088                                         flow->flags |= FLOW_LASTFRAG;
1089                                         /* size = frag_offset*8 + data_size */
1090                                         flow->sizeP = off_frag + flow->sizeF;
1091                                 }
1092                         }
1093
1094 #if ((DEBUG) & DEBUG_C)
1095                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1096                         strcat(logbuf, buf);
1097                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1098                         strcat(logbuf, buf);
1099 #endif
1100
1101                         /*
1102                         Fortunately most interesting transport layer information fit
1103                         into first 8 bytes of IP data field (minimal nonzero size).
1104                         Thus we don't need actual packet reassembling to build whole
1105                         transport layer data. We only check the fragment offset for
1106                         zero value to find packet with this information.
1107                         */
1108                         if (!off_frag && psize >= 8) {
1109                                 switch (flow->proto) {
1110                                         case IPPROTO_TCP:
1111                                         case IPPROTO_UDP:
1112                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1113                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1114                                                 goto tl_known;
1115
1116 #ifdef ICMP_TRICK
1117                                         case IPPROTO_ICMP:
1118                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1119                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1120                                                 goto tl_known;
1121 #endif
1122 #ifdef ICMP_TRICK_CISCO
1123                                         case IPPROTO_ICMP:
1124                                                 flow->dp = *((int32_t *) tl);
1125                                                 goto tl_known;
1126 #endif
1127
1128                                         default:
1129                                                 /* Unknown transport layer */
1130 #if ((DEBUG) & DEBUG_C)
1131                                                 strcat(logbuf, " U");
1132 #endif
1133                                                 flow->sp = 0;
1134                                                 flow->dp = 0;
1135                                                 break;
1136
1137                                         tl_known:
1138 #if ((DEBUG) & DEBUG_C)
1139                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1140                                                 strcat(logbuf, buf);
1141 #endif
1142                                                 flow->flags |= FLOW_TL;
1143                                 }
1144                         }
1145
1146                         /* Check for tcp flags presence (including CWR and ECE). */
1147                         if (flow->proto == IPPROTO_TCP
1148                                 && off_frag < 16
1149                                 && psize >= 16 - off_frag) {
1150                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1151 #if ((DEBUG) & DEBUG_C)
1152                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1153                                 strcat(logbuf, buf);
1154 #endif
1155                         }
1156
1157 #if ((DEBUG) & DEBUG_C)
1158                         sprintf(buf, " => %x", (unsigned) flow);
1159                         strcat(logbuf, buf);
1160                         my_log(LOG_DEBUG, "%s", logbuf);
1161 #endif
1162
1163 #if ((DEBUG) & DEBUG_I)
1164                         pkts_pending++;
1165                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1166                         if (pending_queue_trace < pending_queue_trace_candidate)
1167                                 pending_queue_trace = pending_queue_trace_candidate;
1168 #endif
1169
1170                         /* Flow complete - inform unpending_thread() about it */
1171                         pending_head->flags |= FLOW_PENDING;
1172                         pending_head = pending_head->next;
1173                 done:
1174                         pthread_cond_signal(&unpending_cond);
1175                 }
1176         }
1177         return 0;
1178 }
1179
1180 int main(int argc, char **argv)
1181 {
1182         char errpbuf[512];
1183         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1184         int c, i, write_fd, memory_limit = 0;
1185         struct addrinfo hints, *res;
1186         struct sockaddr_in saddr;
1187         pthread_attr_t tattr;
1188         struct sigaction sigact;
1189         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1190         struct timeval timeout;
1191
1192         sched_min = sched_get_priority_min(SCHED);
1193         sched_max = sched_get_priority_max(SCHED);
1194
1195         memset(&saddr, 0 , sizeof(saddr));
1196         memset(&hints, 0 , sizeof(hints));
1197         hints.ai_flags = AI_PASSIVE;
1198         hints.ai_family = AF_INET;
1199         hints.ai_socktype = SOCK_DGRAM;
1200
1201         /* Process command line options */
1202
1203         opterr = 0;
1204         while ((c = my_getopt(argc, argv, parms)) != -1) {
1205                 switch (c) {
1206                         case '?':
1207                                 usage();
1208
1209                         case 'h':
1210                                 usage();
1211                 }
1212         }
1213
1214         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1215         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1216         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1217         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1218         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1219         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1220         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1221         if (parms[nflag].count) {
1222                 switch (atoi(parms[nflag].arg)) {
1223                         case 1:
1224                                 netflow = &NetFlow1;
1225                                 break;
1226
1227                         case 5:
1228                                 break;
1229
1230                         case 7:
1231                                 netflow = &NetFlow7;
1232                                 break;
1233
1234                         default:
1235                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1236                                 exit(1);
1237                 }
1238         }
1239         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1240         if (parms[lflag].count) {
1241                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1242                         *log_suffix++ = 0;
1243                         if (*log_suffix) {
1244                                 sprintf(errpbuf, "[%s]", log_suffix);
1245                                 strcat(ident, errpbuf);
1246                         }
1247                 }
1248                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1249                 if (log_suffix) *--log_suffix = ':';
1250         }
1251         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1252         err_malloc:
1253                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1254                 exit(1);
1255         }
1256         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1257         if (parms[qflag].count) {
1258                 pending_queue_length = atoi(parms[qflag].arg);
1259                 if (pending_queue_length < 1) {
1260                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1261                         exit(1);
1262                 }
1263         }
1264         if (parms[rflag].count) {
1265                 schedp.sched_priority = atoi(parms[rflag].arg);
1266                 if (schedp.sched_priority
1267                         && (schedp.sched_priority < sched_min
1268                                 || schedp.sched_priority > sched_max)) {
1269                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1270                         exit(1);
1271                 }
1272         }
1273         if (parms[Bflag].count) {
1274                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1275         }
1276         if (parms[bflag].count) {
1277                 bulk_quantity = atoi(parms[bflag].arg);
1278                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1279                         fprintf(stderr, "Illegal %s\n", "bulk size");
1280                         exit(1);
1281                 }
1282         }
1283         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1284         if (parms[Xflag].count) {
1285                 for(i = 0; parms[Xflag].arg[i]; i++)
1286                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1287                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1288                         goto err_malloc;
1289                 rule = strtok(parms[Xflag].arg, ":");
1290                 for (i = 0; rule; i++) {
1291                         snmp_rules[i].len = strlen(rule);
1292                         if (snmp_rules[i].len > IFNAMSIZ) {
1293                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1294                                 exit(1);
1295                         }
1296                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1297                         if (!*(rule - 1)) *(rule - 1) = ',';
1298                         rule = strtok(NULL, ",");
1299                         if (!rule) {
1300                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1301                                 exit(1);
1302                         }
1303                         snmp_rules[i].base = atoi(rule);
1304                         *(rule - 1) = ':';
1305                         rule = strtok(NULL, ":");
1306                 }
1307                 nsnmp_rules = i;
1308         }
1309         if (parms[tflag].count)
1310                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1311         if (parms[aflag].count) {
1312                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1313                 bad_lhost:
1314                         fprintf(stderr, "Illegal %s\n", "source address");
1315                         exit(1);
1316                 } else {
1317                         saddr = *((struct sockaddr_in *) res->ai_addr);
1318                         freeaddrinfo(res);
1319                 }
1320         }
1321         if (parms[uflag].count) 
1322                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1323                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1324                         exit(1);
1325                 }
1326
1327
1328         /* Process collectors parameters. Brrrr... :-[ */
1329
1330         npeers = argc - optind;
1331         if (npeers > 1) {
1332                 /* Send to remote Netflow collector */
1333                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1334                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1335                         dhost = argv[i];
1336                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1337                         *dport++ = 0;
1338                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1339                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1340                                 exit(1);
1341                         }
1342                         peers[npeers].write_fd = write_fd;
1343                         peers[npeers].type = PEER_MIRROR;
1344                         peers[npeers].laddr = saddr;
1345                         peers[npeers].seq = 0;
1346                         if ((lhost = strchr(dport, '/'))) {
1347                                 *lhost++ = 0;
1348                                 if ((type = strchr(lhost, '/'))) {
1349                                         *type++ = 0;
1350                                         switch (*type) {
1351                                                 case 0:
1352                                                 case 'm':
1353                                                         break;
1354
1355                                                 case 'r':
1356                                                         peers[npeers].type = PEER_ROTATE;
1357                                                         npeers_rot++;
1358                                                         break;
1359
1360                                                 default:
1361                                                         goto bad_collector;
1362                                         }
1363                                 }
1364                                 if (*lhost) {
1365                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1366                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1367                                         freeaddrinfo(res);
1368                                 }
1369                         }
1370                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1371                                                 sizeof(struct sockaddr_in))) {
1372                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1373                                 exit(1);
1374                         }
1375                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1376 bad_collector:
1377                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1378                                 exit(1);
1379                         }
1380                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1381                         freeaddrinfo(res);
1382                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1383                                                 sizeof(struct sockaddr_in))) {
1384                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1385                                 exit(1);
1386                         }
1387
1388                         /* Restore command line */
1389                         if (type) *--type = '/';
1390                         if (lhost) *--lhost = '/';
1391                         *--dport = ':';
1392                 }
1393         }
1394         else if (parms[fflag].count) {
1395                 // log into a file
1396                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1397                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1398                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1399                 
1400                 peers[npeers].write_fd = -1;
1401                 peers[npeers].type = PEER_FILE;
1402                 peers[npeers].seq = 0;
1403                 npeers++;
1404         }
1405         else 
1406                 usage();
1407
1408
1409         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1410         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1411         if (!ulog_handle) {
1412                 fprintf(stderr, "libipulog initialization error: %s",
1413                         ipulog_strerror(ipulog_errno));
1414                 exit(1);
1415         }
1416         if (sockbufsize)
1417                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1418                         &sockbufsize, sizeof(sockbufsize)) < 0)
1419                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1420
1421         /* Daemonize (if log destination stdout-free) */
1422
1423         my_log_open(ident, verbosity, log_dest);
1424         if (!(log_dest & 2)) {
1425                 switch (fork()) {
1426                         case -1:
1427                                 fprintf(stderr, "fork(): %s", strerror(errno));
1428                                 exit(1);
1429
1430                         case 0:
1431                                 setsid();
1432                                 freopen("/dev/null", "r", stdin);
1433                                 freopen("/dev/null", "w", stdout);
1434                                 freopen("/dev/null", "w", stderr);
1435                                 break;
1436
1437                         default:
1438                                 exit(0);
1439                 }
1440         } else {
1441                 setvbuf(stdout, (char *)0, _IONBF, 0);
1442                 setvbuf(stderr, (char *)0, _IONBF, 0);
1443         }
1444
1445         pid = getpid();
1446         sprintf(errpbuf, "[%ld]", (long) pid);
1447         strcat(ident, errpbuf);
1448
1449         /* Initialization */
1450
1451         hash_init(); /* Actually for crc16 only */
1452         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1453         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1454
1455 #ifdef UPTIME_TRICK
1456         /* Hope 12 days is enough :-/ */
1457         start_time_offset = 1 << 20;
1458
1459         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1460 #endif
1461         gettime(&start_time);
1462
1463         /*
1464         Build static pending queue as circular buffer.
1465         */
1466         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1467         pending_tail = pending_head;
1468         for (i = pending_queue_length - 1; i--;) {
1469                 if (!(pending_tail->next = mem_alloc())) {
1470                 err_mem_alloc:
1471                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1472                         exit(1);
1473                 }
1474                 pending_tail = pending_tail->next;
1475         }
1476         pending_tail->next = pending_head;
1477         pending_tail = pending_head;
1478
1479         sigemptyset(&sig_mask);
1480         sigact.sa_handler = &sighandler;
1481         sigact.sa_mask = sig_mask;
1482         sigact.sa_flags = 0;
1483         sigaddset(&sig_mask, SIGTERM);
1484         sigaction(SIGTERM, &sigact, 0);
1485 #if ((DEBUG) & DEBUG_I)
1486         sigaddset(&sig_mask, SIGUSR1);
1487         sigaction(SIGUSR1, &sigact, 0);
1488 #endif
1489         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1490                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1491                 exit(1);
1492         }
1493
1494         my_log(LOG_INFO, "Starting %s...", VERSION);
1495
1496         if (parms[cflag].count) {
1497                 if (chdir(parms[cflag].arg) || chroot(".")) {
1498                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1499                         exit(1);
1500                 }
1501         }
1502
1503         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1504         pthread_attr_init(&tattr);
1505         for (i = 0; i < THREADS - 1; i++) {
1506                 if (schedp.sched_priority > 0) {
1507                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1508                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1509                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1510                                 exit(1);
1511                         }
1512                 }
1513                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1514                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1515                         exit(1);
1516                 }
1517                 pthread_detach(thid);
1518                 schedp.sched_priority++;
1519         }
1520
1521         if (pw) {
1522                 if (setgroups(0, NULL)) {
1523                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1524                         exit(1);
1525                 }
1526                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1527                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1528                         exit(1);
1529                 }
1530                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1531                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1532                         exit(1);
1533                 }
1534         }
1535
1536         if (!(pidfile = fopen(pidfilepath, "w")))
1537                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1538         else {
1539                 fprintf(pidfile, "%ld\n", (long) pid);
1540                 fclose(pidfile);
1541         }
1542
1543         my_log(LOG_INFO, "pid: %d", pid);
1544         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1545                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1546                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1547                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1548                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1549                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1550                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1551         for (i = 0; i < nsnmp_rules; i++) {
1552                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1553                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1554         }
1555         for (i = 0; i < npeers; i++) {
1556                 switch (peers[i].type) {
1557                         case PEER_MIRROR:
1558                                 c = 'm';
1559                                 break;
1560                         case PEER_ROTATE:
1561                                 c = 'r';
1562                                 break;
1563                 }
1564                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1565                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1566                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1567         }
1568
1569         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1570
1571         timeout.tv_usec = 0;
1572         while (!killed
1573                 || (total_elements - free_elements - pending_queue_length)
1574                 || emit_count
1575                 || pending_tail->flags) {
1576
1577                 if (!sigs) {
1578                         timeout.tv_sec = scan_interval;
1579                         select(0, 0, 0, 0, &timeout);
1580                 }
1581
1582                 if (sigs & SIGTERM_MASK && !killed) {
1583                         sigs &= ~SIGTERM_MASK;
1584                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1585                         scan_interval = 1;
1586                         frag_lifetime = -1;
1587                         active_lifetime = -1;
1588                         inactive_lifetime = -1;
1589                         emit_timeout = 1;
1590                         unpending_timeout = 1;
1591                         killed = 1;
1592                         pthread_cond_signal(&scan_cond);
1593                         pthread_cond_signal(&unpending_cond);
1594                 }
1595
1596 #if ((DEBUG) & DEBUG_I)
1597                 if (sigs & SIGUSR1_MASK) {
1598                         sigs &= ~SIGUSR1_MASK;
1599                         info_debug();
1600                 }
1601 #endif
1602         }
1603         remove(pidfilepath);
1604 #if ((DEBUG) & DEBUG_I)
1605         info_debug();
1606 #endif
1607         my_log(LOG_INFO, "Done.");
1608 #ifdef WALL
1609         return 0;
1610 #endif
1611 }