innocuous
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8         **
9 */
10
11 #include <common.h>
12
13 /* stdout, stderr, freopen() */
14 #include <stdio.h>
15
16 /* atoi(), exit() */
17 #include <stdlib.h>
18
19 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
20 #include <unistd.h>
21
22 /* strerror() */
23 #include <string.h>
24
25 /* sig*() */
26 #include <signal.h>
27
28 #include <libipulog/libipulog.h>
29 struct ipulog_handle {
30         int fd;
31         u_int8_t blocking;
32         struct sockaddr_nl local;
33         struct sockaddr_nl peer;
34         struct nlmsghdr* last_nlhdr;
35 };
36
37 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
38 #include <sys/types.h>
39 #include <netinet/in_systm.h>
40 #include <sys/socket.h>
41 #include <netinet/in.h>
42 #include <arpa/inet.h>
43 #include <netinet/ip.h>
44 #include <netinet/tcp.h>
45 #include <netinet/udp.h>
46 #include <netinet/ip_icmp.h>
47 #include <net/if.h>
48
49 #include <sys/param.h>
50 #include <pwd.h>
51 #ifdef OS_LINUX
52 #include <grp.h>
53 #endif
54
55 /* pthread_*() */
56 #include <pthread.h>
57
58 /* errno */
59 #include <errno.h>
60
61 /* getaddrinfo() */
62 #include <netdb.h>
63
64 /* nanosleep() */
65 #include <time.h>
66
67 /* gettimeofday() */
68 #include <sys/time.h>
69
70 /* scheduling */
71 #include <sched.h>
72
73 /* select() (POSIX)*/
74 #include <sys/select.h>
75
76 /* open() */
77 #include <sys/stat.h>
78 #include <fcntl.h>
79
80 #include <fprobe-ulog.h>
81 #include <my_log.h>
82 #include <my_getopt.h>
83 #include <netflow.h>
84 #include <hash.h>
85 #include <mem.h>
86
87 enum {
88         aflag,
89         Bflag,
90         bflag,
91         cflag,
92         dflag,
93         eflag,
94         Eflag,
95         fflag,
96         gflag,
97         hflag,
98         lflag,
99         mflag,
100         Mflag,
101         nflag,
102         qflag,
103         rflag,
104         sflag,
105         tflag,
106         Tflag,
107         Uflag,
108         uflag,
109         vflag,
110         Xflag,
111 };
112
113 static struct getopt_parms parms[] = {
114         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
115         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
116         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
117         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
118         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
119         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
120         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
121         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
122         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
123         {'h', 0, 0, 0},
124         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'M', 0, 0, 0},
127         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {0, 0, 0, 0}
138 };
139
140 extern char *optarg;
141 extern int optind, opterr, optopt;
142 extern int errno;
143
144 extern struct NetFlow NetFlow1;
145 extern struct NetFlow NetFlow5;
146 extern struct NetFlow NetFlow7;
147
148 #define mark_is_tos parms[Mflag].count
149 static unsigned scan_interval = 5;
150 static int frag_lifetime = 30;
151 static int inactive_lifetime = 60;
152 static int active_lifetime = 300;
153 static int sockbufsize;
154 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
155 #if (MEM_BITS == 0) || (MEM_BITS == 16)
156 #define BULK_QUANTITY 10000
157 #else
158 #define BULK_QUANTITY 200
159 #endif
160
161 static unsigned epoch_length=60, log_epochs=1; 
162 static unsigned cur_epoch=0,prev_uptime=0;
163
164 static unsigned bulk_quantity = BULK_QUANTITY;
165 static unsigned pending_queue_length = 100;
166 static struct NetFlow *netflow = &NetFlow5;
167 static unsigned verbosity = 6;
168 static unsigned log_dest = MY_LOG_SYSLOG;
169 static struct Time start_time;
170 static long start_time_offset;
171 static int off_tl;
172 /* From mem.c */
173 extern unsigned total_elements;
174 extern unsigned free_elements;
175 extern unsigned total_memory;
176 #if ((DEBUG) & DEBUG_I)
177 static unsigned emit_pkts, emit_queue;
178 static uint64_t size_total;
179 static unsigned pkts_total, pkts_total_fragmented;
180 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
181 static unsigned pkts_pending, pkts_pending_done;
182 static unsigned pending_queue_trace, pending_queue_trace_candidate;
183 static unsigned flows_total, flows_fragmented;
184 #endif
185 static unsigned emit_count;
186 static uint32_t emit_sequence;
187 static unsigned emit_rate_bytes, emit_rate_delay;
188 static struct Time emit_time;
189 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
190 static pthread_t thid;
191 static sigset_t sig_mask;
192 static struct sched_param schedp;
193 static int sched_min, sched_max;
194 static int npeers, npeers_rot;
195 static struct peer *peers;
196 static int sigs;
197
198 static struct Flow *flows[1 << HASH_BITS];
199 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
200
201 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
202 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
203
204 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
205 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
206 static struct Flow *pending_head, *pending_tail;
207 static struct Flow *scan_frag_dreg;
208
209 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
210 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
211 static struct Flow *flows_emit;
212
213 static char ident[256] = "fprobe-ulog";
214 static FILE *pidfile;
215 static char *pidfilepath;
216 static pid_t pid;
217 static int killed;
218 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
219 static struct ipulog_handle *ulog_handle;
220 static uint32_t ulog_gmask = 1;
221 static char *cap_buf;
222 static int nsnmp_rules;
223 static struct snmp_rule *snmp_rules;
224 static struct passwd *pw = 0;
225
226 void usage()
227 {
228         fprintf(stdout,
229                 "fprobe-ulog: a NetFlow probe. Version %s\n"
230                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
231                 "\n"
232                 "-h\t\tDisplay this help\n"
233                 "-U <mask>\tULOG group bitwise mask [1]\n"
234                 "-s <seconds>\tHow often scan for expired flows [5]\n"
235                 "-g <seconds>\tFragmented flow lifetime [30]\n"
236                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
237                 "-f <filename>\tLog flow data in a file\n"
238                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
239                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
240                 "-a <address>\tUse <address> as source for NetFlow flow\n"
241                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
242                 "-M\t\tUse netfilter mark value as ToS flag\n"
243                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
244                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
245                 "-q <flows>\tPending queue length [100]\n"
246                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
247                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
248                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
249                 "-c <directory>\tDirectory to chroot to\n"
250                 "-u <user>\tUser to run as\n"
251                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
252                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
253                 "-y <remote:port>\tAddress of the NetFlow collector\n"
254                 "-f <writable file>\tFile to write data into\n"
255                 "-T <n>\tRotate log file every n epochs\n"
256                 "-E <[1..60]>\tSize of an epoch in minutes\n",
257                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
258         exit(0);
259 }
260
261 #if ((DEBUG) & DEBUG_I)
262 void info_debug()
263 {
264         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
265                 pkts_total, pkts_total_fragmented, size_total,
266                 pkts_pending - pkts_pending_done, pending_queue_trace);
267         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
268                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
269         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
270                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
271         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
272                 total_elements, free_elements, total_memory);
273 }
274 #endif
275
276 void sighandler(int sig)
277 {
278         switch (sig) {
279                 case SIGTERM:
280                         sigs |= SIGTERM_MASK;
281                         break;
282 #if ((DEBUG) & DEBUG_I)
283                 case SIGUSR1:
284                         sigs |= SIGUSR1_MASK;
285                         break;
286 #endif
287         }
288 }
289
290 void gettime(struct Time *now)
291 {
292         struct timeval t;
293
294         gettimeofday(&t, 0);
295         now->sec = t.tv_sec;
296         now->usec = t.tv_usec;
297 }
298
299 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
300 {
301         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
302 }
303
304 /* Uptime in miliseconds */
305 uint32_t getuptime(struct Time *t)
306 {
307         /* Maximum uptime is about 49/2 days */
308         return cmpmtime(t, &start_time);
309 }
310
311 hash_t hash_flow(struct Flow *flow)
312 {
313         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
314         else return hash(flow, sizeof(struct Flow_TL));
315 }
316
317 uint16_t snmp_index(char *name) {
318         uint32_t i;
319
320         if (!*name) return 0;
321
322         for (i = 0; (int) i < nsnmp_rules; i++) {
323                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
324                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
325         }
326
327         if ((i = if_nametoindex(name))) return i;
328
329         return -1;
330 }
331
332 inline void copy_flow(struct Flow *src, struct Flow *dst)
333 {
334         dst->iif = src->iif;
335         dst->oif = src->oif;
336         dst->sip = src->sip;
337         dst->dip = src->dip;
338         dst->tos = src->tos;
339         dst->proto = src->proto;
340         dst->tcp_flags = src->tcp_flags;
341         dst->id = src->id;
342         dst->sp = src->sp;
343         dst->dp = src->dp;
344         dst->pkts = src->pkts;
345         dst->size = src->size;
346         dst->sizeF = src->sizeF;
347         dst->sizeP = src->sizeP;
348         dst->ctime = src->ctime;
349         dst->mtime = src->mtime;
350         dst->flags = src->flags;
351 }
352
353 unsigned get_log_fd(char *fname, unsigned cur_fd) {
354         struct Time now;
355         unsigned cur_uptime;
356         int ret_fd;
357         gettime(&now);
358         cur_uptime = getuptime(&now);
359
360         /* Epoch lenght in minutes */
361         if ((cur_uptime - prev_uptime) > (1000 * 60 * epoch_length)) {
362                 char nextname[MAX_PATH_LEN];
363                 int write_fd;
364                 prev_uptime = cur_uptime;
365                 cur_epoch = (cur_epoch + 1) % log_epochs;
366                 close(cur_fd);
367                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
368                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
369                         fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
370                         exit(1);
371                 }
372                 ret_fd = write_fd;
373         }
374         else
375                 ret_fd = cur_fd;
376         return(ret_fd);
377 }
378
379 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
380 {
381         struct Flow **flowpp;
382
383 #ifdef WALL
384         flowpp = 0;
385 #endif
386
387         if (prev) flowpp = *prev;
388
389         while (where) {
390                 if (where->sip.s_addr == what->sip.s_addr
391                         && where->dip.s_addr == what->dip.s_addr
392                         && where->proto == what->proto) {
393                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
394                                 case 0:
395                                         /* Both unfragmented */
396                                         if ((what->sp == where->sp)
397                                                 && (what->dp == where->dp)) goto done;
398                                         break;
399                                 case 2:
400                                         /* Both fragmented */
401                                         if (where->id == what->id) goto done;
402                                         break;
403                         }
404                 }
405                 flowpp = &where->next;
406                 where = where->next;
407         }
408 done:
409         if (prev) *prev = flowpp;
410         return where;
411 }
412
413 int put_into(struct Flow *flow, int flag
414 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
415         , char *logbuf
416 #endif
417 )
418 {
419         int ret = 0;
420         hash_t h;
421         struct Flow *flown, **flowpp;
422 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
423         char buf[64];
424 #endif
425
426         h = hash_flow(flow);
427 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
428         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
429         strcat(logbuf, buf);
430 #endif
431         pthread_mutex_lock(&flows_mutex[h]);
432         flowpp = &flows[h];
433         if (!(flown = find(flows[h], flow, &flowpp))) {
434                 /* No suitable flow found - add */
435                 if (flag == COPY_INTO) {
436                         if ((flown = mem_alloc())) {
437                                 copy_flow(flow, flown);
438                                 flow = flown;
439                         } else {
440 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
441                                 my_log(LOG_ERR, "%s %s. %s",
442                                         "mem_alloc():", strerror(errno), "packet lost");
443 #endif
444                                 return -1;
445                         }
446                 }
447                 flow->next = flows[h];
448                 flows[h] = flow;
449 #if ((DEBUG) & DEBUG_I)
450                 flows_total++;
451                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
452 #endif
453 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
454                 if (flown) {
455                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
456                         strcat(logbuf, buf);
457                 }
458 #endif
459         } else {
460                 /* Found suitable flow - update */
461 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
462                 sprintf(buf, " +> %x", (unsigned) flown);
463                 strcat(logbuf, buf);
464 #endif
465                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
466                         flown->mtime = flow->mtime;
467                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
468                         flown->ctime = flow->ctime;
469                 flown->tcp_flags |= flow->tcp_flags;
470                 flown->size += flow->size;
471                 flown->pkts += flow->pkts;
472                 if (flow->flags & FLOW_FRAG) {
473                         /* Fragmented flow require some additional work */
474                         if (flow->flags & FLOW_TL) {
475                                 /*
476                                 ?FIXME?
477                                 Several packets with FLOW_TL (attack)
478                                 */
479                                 flown->sp = flow->sp;
480                                 flown->dp = flow->dp;
481                         }
482                         if (flow->flags & FLOW_LASTFRAG) {
483                                 /*
484                                 ?FIXME?
485                                 Several packets with FLOW_LASTFRAG (attack)
486                                 */
487                                 flown->sizeP = flow->sizeP;
488                         }
489                         flown->flags |= flow->flags;
490                         flown->sizeF += flow->sizeF;
491                         if ((flown->flags & FLOW_LASTFRAG)
492                                 && (flown->sizeF >= flown->sizeP)) {
493                                 /* All fragments received - flow reassembled */
494                                 *flowpp = flown->next;
495                                 pthread_mutex_unlock(&flows_mutex[h]);
496 #if ((DEBUG) & DEBUG_I)
497                                 flows_total--;
498                                 flows_fragmented--;
499 #endif
500                                 flown->id = 0;
501                                 flown->flags &= ~FLOW_FRAG;
502 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
503                                 strcat(logbuf," R");
504 #endif
505                                 ret = put_into(flown, MOVE_INTO
506 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
507                                                 , logbuf
508 #endif
509                                         );
510                         }
511                 }
512                 if (flag == MOVE_INTO) mem_free(flow);
513         }
514         pthread_mutex_unlock(&flows_mutex[h]);
515         return ret;
516 }
517
518 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
519 {
520         int i;
521
522         for (i = 0; i < fields; i++) {
523 #if ((DEBUG) & DEBUG_F)
524                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
525 #endif
526                 switch (format[i]) {
527                         case NETFLOW_IPV4_SRC_ADDR:
528                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
529                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
530                                 break;
531
532                         case NETFLOW_IPV4_DST_ADDR:
533                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
534                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
535                                 break;
536
537                         case NETFLOW_INPUT_SNMP:
538                                 *((uint16_t *) p) = htons(flow->iif);
539                                 p += NETFLOW_INPUT_SNMP_SIZE;
540                                 break;
541
542                         case NETFLOW_OUTPUT_SNMP:
543                                 *((uint16_t *) p) = htons(flow->oif);
544                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
545                                 break;
546
547                         case NETFLOW_PKTS_32:
548                                 *((uint32_t *) p) = htonl(flow->pkts);
549                                 p += NETFLOW_PKTS_32_SIZE;
550                                 break;
551
552                         case NETFLOW_BYTES_32:
553                                 *((uint32_t *) p) = htonl(flow->size);
554                                 p += NETFLOW_BYTES_32_SIZE;
555                                 break;
556
557                         case NETFLOW_FIRST_SWITCHED:
558                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
559                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
560                                 break;
561
562                         case NETFLOW_LAST_SWITCHED:
563                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
564                                 p += NETFLOW_LAST_SWITCHED_SIZE;
565                                 break;
566
567                         case NETFLOW_L4_SRC_PORT:
568                                 *((uint16_t *) p) = flow->sp;
569                                 p += NETFLOW_L4_SRC_PORT_SIZE;
570                                 break;
571
572                         case NETFLOW_L4_DST_PORT:
573                                 *((uint16_t *) p) = flow->dp;
574                                 p += NETFLOW_L4_DST_PORT_SIZE;
575                                 break;
576
577                         case NETFLOW_PROT:
578                                 *((uint8_t *) p) = flow->proto;
579                                 p += NETFLOW_PROT_SIZE;
580                                 break;
581
582                         case NETFLOW_SRC_TOS:
583                                 *((uint8_t *) p) = flow->tos;
584                                 p += NETFLOW_SRC_TOS_SIZE;
585                                 break;
586
587                         case NETFLOW_TCP_FLAGS:
588                                 *((uint8_t *) p) = flow->tcp_flags;
589                                 p += NETFLOW_TCP_FLAGS_SIZE;
590                                 break;
591
592                         case NETFLOW_VERSION:
593                                 *((uint16_t *) p) = htons(netflow->Version);
594                                 p += NETFLOW_VERSION_SIZE;
595                                 break;
596
597                         case NETFLOW_COUNT:
598                                 *((uint16_t *) p) = htons(emit_count);
599                                 p += NETFLOW_COUNT_SIZE;
600                                 break;
601
602                         case NETFLOW_UPTIME:
603                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
604                                 p += NETFLOW_UPTIME_SIZE;
605                                 break;
606
607                         case NETFLOW_UNIX_SECS:
608                                 *((uint32_t *) p) = htonl(emit_time.sec);
609                                 p += NETFLOW_UNIX_SECS_SIZE;
610                                 break;
611
612                         case NETFLOW_UNIX_NSECS:
613                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
614                                 p += NETFLOW_UNIX_NSECS_SIZE;
615                                 break;
616
617                         case NETFLOW_FLOW_SEQUENCE:
618                                 //*((uint32_t *) p) = htonl(emit_sequence);
619                                 *((uint32_t *) p) = 0;
620                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
621                                 break;
622
623                         case NETFLOW_PAD8:
624                         /* Unsupported (uint8_t) */
625                         case NETFLOW_ENGINE_TYPE:
626                         case NETFLOW_ENGINE_ID:
627                         case NETFLOW_FLAGS7_1:
628                         case NETFLOW_SRC_MASK:
629                         case NETFLOW_DST_MASK:
630                                 *((uint8_t *) p) = 0;
631                                 p += NETFLOW_PAD8_SIZE;
632                                 break;
633                         case NETFLOW_PLANETLAB_XID:
634                                 *((uint16_t *) p) = flow->tos;
635                                 p += NETFLOW_PLANETLAB_XID_SIZE;
636                                 break;
637                         case NETFLOW_PAD16:
638                         /* Unsupported (uint16_t) */
639                         case NETFLOW_SRC_AS:
640                         case NETFLOW_DST_AS:
641                         case NETFLOW_FLAGS7_2:
642                                 *((uint16_t *) p) = 0;
643                                 p += NETFLOW_PAD16_SIZE;
644                                 break;
645
646                         case NETFLOW_PAD32:
647                         /* Unsupported (uint32_t) */
648                         case NETFLOW_IPV4_NEXT_HOP:
649                         case NETFLOW_ROUTER_SC:
650                                 *((uint32_t *) p) = 0;
651                                 p += NETFLOW_PAD32_SIZE;
652                                 break;
653
654                         default:
655                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
656                                         format, i, format[i]);
657                                 exit(1);
658                 }
659         }
660 #if ((DEBUG) & DEBUG_F)
661         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
662 #endif
663         return p;
664 }
665
666 void setuser() {
667         /*
668         Workaround for clone()-based threads
669         Try to change EUID independently of main thread
670         */
671         if (pw) {
672                 setgroups(0, NULL);
673                 setregid(pw->pw_gid, pw->pw_gid);
674                 setreuid(pw->pw_uid, pw->pw_uid);
675         }
676 }
677
678 void *emit_thread()
679 {
680         struct Flow *flow;
681         void *p;
682         struct timeval now;
683         struct timespec timeout;
684         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
685
686         p = (void *) &emit_packet + netflow->HeaderSize;
687         timeout.tv_nsec = 0;
688
689         setuser();
690
691         for (;;) {
692                 pthread_mutex_lock(&emit_mutex);
693                 while (!flows_emit) {
694                         gettimeofday(&now, 0);
695                         timeout.tv_sec = now.tv_sec + emit_timeout;
696                         /* Do not wait until emit_packet will filled - it may be too long */
697                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
698                                 pthread_mutex_unlock(&emit_mutex);
699                                 goto sendit;
700                         }
701                 }
702                 flow = flows_emit;
703                 flows_emit = flows_emit->next;
704 #if ((DEBUG) & DEBUG_I)
705                 emit_queue--;
706 #endif          
707                 pthread_mutex_unlock(&emit_mutex);
708
709 #ifdef UPTIME_TRICK
710                 if (!emit_count) {
711                         gettime(&start_time);
712                         start_time.sec -= start_time_offset;
713                 }
714 #endif
715                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
716                 mem_free(flow);
717                 emit_count++;
718 #ifdef PF2_DEBUG
719                 printf("Emit count = %d\n", emit_count);
720                 fflush(stdout);
721 #endif
722                 if (emit_count == netflow->MaxFlows) {
723                 sendit:
724                         gettime(&emit_time);
725                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
726                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
727                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
728                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
729                         peer_rot_cur = 0;
730                         for (i = 0; i < npeers; i++) {
731                                 if (peers[0].type == PEER_FILE) {
732                                                 if (netflow->SeqOffset)
733                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
734 #define MESSAGES
735                                                 peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
736                                                 ret = write(peers[0].write_fd, emit_packet, size);
737                                                 if (ret < size) {
738 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
739                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
740                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
741 #endif
742 #undef MESSAGES
743                                                 }
744 #if ((DEBUG) & DEBUG_E)
745                                                 commaneelse {
746                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
747                                                                 emit_count, i + 1, peers[i].seq);
748                                                 }
749 #endif
750                                                 peers[0].seq += emit_count;
751
752                                                 /* Rate limit */
753                                                 if (emit_rate_bytes) {
754                                                         sent += size;
755                                                         delay = sent / emit_rate_bytes;
756                                                         if (delay) {
757                                                                 sent %= emit_rate_bytes;
758                                                                 timeout.tv_sec = 0;
759                                                                 timeout.tv_nsec = emit_rate_delay * delay;
760                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
761                                                         }
762                                                 }
763                                         }
764                                 else
765                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
766                                 else
767                                 if (peers[i].type == PEER_ROTATE) 
768                                         if (peer_rot_cur++ == peer_rot_work) {
769                                         sendreal:
770                                                 if (netflow->SeqOffset)
771                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
772                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
773                                                 if (ret < size) {
774 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
775                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
776                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
777 #endif
778                                                 }
779 #if ((DEBUG) & DEBUG_E)
780                                                 commaneelse {
781                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
782                                                                 emit_count, i + 1, peers[i].seq);
783                                                 }
784 #endif
785                                                 peers[i].seq += emit_count;
786
787                                                 /* Rate limit */
788                                                 if (emit_rate_bytes) {
789                                                         sent += size;
790                                                         delay = sent / emit_rate_bytes;
791                                                         if (delay) {
792                                                                 sent %= emit_rate_bytes;
793                                                                 timeout.tv_sec = 0;
794                                                                 timeout.tv_nsec = emit_rate_delay * delay;
795                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
796                                                         }
797                                                 }
798                                         }
799                         }
800                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
801                         emit_sequence += emit_count;
802                         emit_count = 0;
803 #if ((DEBUG) & DEBUG_I)
804                         emit_pkts++;
805 #endif
806                 }
807         }
808 }       
809
810 void *unpending_thread()
811 {
812         struct timeval now;
813         struct timespec timeout;
814 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
815         char logbuf[256];
816 #endif
817
818         setuser();
819
820         timeout.tv_nsec = 0;
821         pthread_mutex_lock(&unpending_mutex);
822
823         for (;;) {
824                 while (!(pending_tail->flags & FLOW_PENDING)) {
825                         gettimeofday(&now, 0);
826                         timeout.tv_sec = now.tv_sec + unpending_timeout;
827                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
828                 }
829
830 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
831                 *logbuf = 0;
832 #endif
833                 if (put_into(pending_tail, COPY_INTO
834 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
835                                 , logbuf
836 #endif
837                         ) < 0) {
838 #if ((DEBUG) & DEBUG_I)
839                         pkts_lost_unpending++;
840 #endif                          
841                 }
842
843 #if ((DEBUG) & DEBUG_U)
844                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
845 #endif
846
847                 pending_tail->flags = 0;
848                 pending_tail = pending_tail->next;
849 #if ((DEBUG) & DEBUG_I)
850                 pkts_pending_done++;
851 #endif
852         }
853 }
854
855 void *scan_thread()
856 {
857 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
858         char logbuf[256];
859 #endif
860         int i;
861         struct Flow *flow, **flowpp;
862         struct Time now;
863         struct timespec timeout;
864
865         setuser();
866
867         timeout.tv_nsec = 0;
868         pthread_mutex_lock(&scan_mutex);
869
870         for (;;) {
871                 gettime(&now);
872                 timeout.tv_sec = now.sec + scan_interval;
873                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
874
875                 gettime(&now);
876 #if ((DEBUG) & DEBUG_S)
877                 my_log(LOG_DEBUG, "S: %d", now.sec);
878 #endif
879                 for (i = 0; i < 1 << HASH_BITS ; i++) {
880                         pthread_mutex_lock(&flows_mutex[i]);
881                         flow = flows[i];
882                         flowpp = &flows[i];
883                         while (flow) {
884                                 if (flow->flags & FLOW_FRAG) {
885                                         /* Process fragmented flow */
886                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
887                                                 /* Fragmented flow expired - put it into special chain */
888 #if ((DEBUG) & DEBUG_I)
889                                                 flows_fragmented--;
890                                                 flows_total--;
891 #endif
892                                                 *flowpp = flow->next;
893                                                 flow->id = 0;
894                                                 flow->flags &= ~FLOW_FRAG;
895                                                 flow->next = scan_frag_dreg;
896                                                 scan_frag_dreg = flow;
897                                                 flow = *flowpp;
898                                                 continue;
899                                         }
900                                 } else {
901                                         /* Flow is not frgamented */
902                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
903                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
904                                                 /* Flow expired */
905 #if ((DEBUG) & DEBUG_S)
906                                                 my_log(LOG_DEBUG, "S: E %x", flow);
907 #endif
908 #if ((DEBUG) & DEBUG_I)
909                                                 flows_total--;
910 #endif
911                                                 *flowpp = flow->next;
912                                                 pthread_mutex_lock(&emit_mutex);
913                                                 flow->next = flows_emit;
914                                                 flows_emit = flow;
915 #if ((DEBUG) & DEBUG_I)
916                                                 emit_queue++;
917 #endif                          
918                                                 pthread_mutex_unlock(&emit_mutex);
919                                                 flow = *flowpp;
920                                                 continue;
921                                         }
922                                 }
923                                 flowpp = &flow->next;
924                                 flow = flow->next;
925                         } /* chain loop */
926                         pthread_mutex_unlock(&flows_mutex[i]);
927                 } /* hash loop */
928                 if (flows_emit) pthread_cond_signal(&emit_cond);
929
930                 while (scan_frag_dreg) {
931                         flow = scan_frag_dreg;
932                         scan_frag_dreg = flow->next;
933 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
934                         *logbuf = 0;
935 #endif
936                         put_into(flow, MOVE_INTO
937 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
938                                 , logbuf
939 #endif
940                         );
941 #if ((DEBUG) & DEBUG_S)
942                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
943 #endif
944                 }
945         }
946 }
947
948 void *cap_thread()
949 {
950         struct ulog_packet_msg *ulog_msg;
951         struct ip *nl;
952         void *tl;
953         struct Flow *flow;
954         int len, off_frag, psize;
955 #if ((DEBUG) & DEBUG_C)
956         char buf[64];
957         char logbuf[256];
958 #endif
959
960         setuser();
961
962         while (!killed) {
963                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
964                 if (len <= 0) {
965                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
966                         continue;
967                 }
968                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
969
970 #if ((DEBUG) & DEBUG_C)
971                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
972 #endif
973
974                         nl = (void *) &ulog_msg->payload;
975                         psize = ulog_msg->data_len;
976
977                         /* Sanity check */
978                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
979 #if ((DEBUG) & DEBUG_C)
980                                 strcat(logbuf, " U");
981                                 my_log(LOG_DEBUG, "%s", logbuf);
982 #endif
983 #if ((DEBUG) & DEBUG_I)
984                                 pkts_ignored++;
985 #endif
986                                 continue;
987                         }
988
989                         if (pending_head->flags) {
990 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
991                                 my_log(LOG_ERR,
992 # if ((DEBUG) & DEBUG_C)
993                                         "%s %s %s", logbuf,
994 # else
995                                         "%s %s",
996 # endif
997                                         "pending queue full:", "packet lost");
998 #endif
999 #if ((DEBUG) & DEBUG_I)
1000                                 pkts_lost_capture++;
1001 #endif
1002                                 goto done;
1003                         }
1004
1005 #if ((DEBUG) & DEBUG_I)
1006                         pkts_total++;
1007 #endif
1008
1009                         flow = pending_head;
1010
1011                         /* ?FIXME? Add sanity check for ip_len? */
1012                         flow->size = ntohs(nl->ip_len);
1013 #if ((DEBUG) & DEBUG_I)
1014                         size_total += flow->size;
1015 #endif
1016
1017                         flow->sip = nl->ip_src;
1018                         flow->dip = nl->ip_dst;
1019                         flow->iif = snmp_index(ulog_msg->indev_name);
1020                         flow->oif = snmp_index(ulog_msg->outdev_name);
1021                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1022                         flow->proto = nl->ip_p;
1023                         flow->id = 0;
1024                         flow->tcp_flags = 0;
1025                         flow->pkts = 1;
1026                         flow->sizeF = 0;
1027                         flow->sizeP = 0;
1028                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1029                         if (ulog_msg->timestamp_sec) {
1030                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1031                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1032                         } else gettime(&flow->ctime);
1033                         flow->mtime = flow->ctime;
1034
1035                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1036
1037                         /*
1038                         Offset (from network layer) to transport layer header/IP data
1039                         IOW IP header size ;-)
1040
1041                         ?FIXME?
1042                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1043                         */
1044                         off_tl = nl->ip_hl << 2;
1045                         tl = (void *) nl + off_tl;
1046
1047                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1048                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1049                         psize -= off_tl;
1050                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1051                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1052
1053                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1054                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1055 #if ((DEBUG) & DEBUG_C)
1056                                 strcat(logbuf, " F");
1057 #endif
1058 #if ((DEBUG) & DEBUG_I)
1059                                 pkts_total_fragmented++;
1060 #endif
1061                                 flow->flags |= FLOW_FRAG;
1062                                 flow->id = nl->ip_id;
1063
1064                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1065                                         /* Packet whith IP_MF contains information about whole datagram size */
1066                                         flow->flags |= FLOW_LASTFRAG;
1067                                         /* size = frag_offset*8 + data_size */
1068                                         flow->sizeP = off_frag + flow->sizeF;
1069                                 }
1070                         }
1071
1072 #if ((DEBUG) & DEBUG_C)
1073                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1074                         strcat(logbuf, buf);
1075                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1076                         strcat(logbuf, buf);
1077 #endif
1078
1079                         /*
1080                         Fortunately most interesting transport layer information fit
1081                         into first 8 bytes of IP data field (minimal nonzero size).
1082                         Thus we don't need actual packet reassembling to build whole
1083                         transport layer data. We only check the fragment offset for
1084                         zero value to find packet with this information.
1085                         */
1086                         if (!off_frag && psize >= 8) {
1087                                 switch (flow->proto) {
1088                                         case IPPROTO_TCP:
1089                                         case IPPROTO_UDP:
1090                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1091                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1092                                                 goto tl_known;
1093
1094 #ifdef ICMP_TRICK
1095                                         case IPPROTO_ICMP:
1096                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1097                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1098                                                 goto tl_known;
1099 #endif
1100 #ifdef ICMP_TRICK_CISCO
1101                                         case IPPROTO_ICMP:
1102                                                 flow->dp = *((int32_t *) tl);
1103                                                 goto tl_known;
1104 #endif
1105
1106                                         default:
1107                                                 /* Unknown transport layer */
1108 #if ((DEBUG) & DEBUG_C)
1109                                                 strcat(logbuf, " U");
1110 #endif
1111                                                 flow->sp = 0;
1112                                                 flow->dp = 0;
1113                                                 break;
1114
1115                                         tl_known:
1116 #if ((DEBUG) & DEBUG_C)
1117                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1118                                                 strcat(logbuf, buf);
1119 #endif
1120                                                 flow->flags |= FLOW_TL;
1121                                 }
1122                         }
1123
1124                         /* Check for tcp flags presence (including CWR and ECE). */
1125                         if (flow->proto == IPPROTO_TCP
1126                                 && off_frag < 16
1127                                 && psize >= 16 - off_frag) {
1128                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1129 #if ((DEBUG) & DEBUG_C)
1130                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1131                                 strcat(logbuf, buf);
1132 #endif
1133                         }
1134
1135 #if ((DEBUG) & DEBUG_C)
1136                         sprintf(buf, " => %x", (unsigned) flow);
1137                         strcat(logbuf, buf);
1138                         my_log(LOG_DEBUG, "%s", logbuf);
1139 #endif
1140
1141 #if ((DEBUG) & DEBUG_I)
1142                         pkts_pending++;
1143                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1144                         if (pending_queue_trace < pending_queue_trace_candidate)
1145                                 pending_queue_trace = pending_queue_trace_candidate;
1146 #endif
1147
1148                         /* Flow complete - inform unpending_thread() about it */
1149                         pending_head->flags |= FLOW_PENDING;
1150                         pending_head = pending_head->next;
1151                 done:
1152                         pthread_cond_signal(&unpending_cond);
1153                 }
1154         }
1155         return 0;
1156 }
1157
1158 int main(int argc, char **argv)
1159 {
1160         char errpbuf[512];
1161         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1162         int c, i, write_fd, memory_limit = 0;
1163         struct addrinfo hints, *res;
1164         struct sockaddr_in saddr;
1165         pthread_attr_t tattr;
1166         struct sigaction sigact;
1167         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1168         struct timeval timeout;
1169
1170         sched_min = sched_get_priority_min(SCHED);
1171         sched_max = sched_get_priority_max(SCHED);
1172
1173         memset(&saddr, 0 , sizeof(saddr));
1174         memset(&hints, 0 , sizeof(hints));
1175         hints.ai_flags = AI_PASSIVE;
1176         hints.ai_family = AF_INET;
1177         hints.ai_socktype = SOCK_DGRAM;
1178
1179         /* Process command line options */
1180
1181         opterr = 0;
1182         while ((c = my_getopt(argc, argv, parms)) != -1) {
1183                 switch (c) {
1184                         case '?':
1185                                 usage();
1186
1187                         case 'h':
1188                                 usage();
1189                 }
1190         }
1191
1192         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1193         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1194         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1195         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1196         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1197         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1198         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1199         if (parms[nflag].count) {
1200                 switch (atoi(parms[nflag].arg)) {
1201                         case 1:
1202                                 netflow = &NetFlow1;
1203                                 break;
1204
1205                         case 5:
1206                                 break;
1207
1208                         case 7:
1209                                 netflow = &NetFlow7;
1210                                 break;
1211
1212                         default:
1213                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1214                                 exit(1);
1215                 }
1216         }
1217         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1218         if (parms[lflag].count) {
1219                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1220                         *log_suffix++ = 0;
1221                         if (*log_suffix) {
1222                                 sprintf(errpbuf, "[%s]", log_suffix);
1223                                 strcat(ident, errpbuf);
1224                         }
1225                 }
1226                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1227                 if (log_suffix) *--log_suffix = ':';
1228         }
1229         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1230         err_malloc:
1231                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1232                 exit(1);
1233         }
1234         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1235         if (parms[qflag].count) {
1236                 pending_queue_length = atoi(parms[qflag].arg);
1237                 if (pending_queue_length < 1) {
1238                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1239                         exit(1);
1240                 }
1241         }
1242         if (parms[rflag].count) {
1243                 schedp.sched_priority = atoi(parms[rflag].arg);
1244                 if (schedp.sched_priority
1245                         && (schedp.sched_priority < sched_min
1246                                 || schedp.sched_priority > sched_max)) {
1247                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1248                         exit(1);
1249                 }
1250         }
1251         if (parms[Bflag].count) {
1252                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1253         }
1254         if (parms[bflag].count) {
1255                 bulk_quantity = atoi(parms[bflag].arg);
1256                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1257                         fprintf(stderr, "Illegal %s\n", "bulk size");
1258                         exit(1);
1259                 }
1260         }
1261         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1262         if (parms[Xflag].count) {
1263                 for(i = 0; parms[Xflag].arg[i]; i++)
1264                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1265                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1266                         goto err_malloc;
1267                 rule = strtok(parms[Xflag].arg, ":");
1268                 for (i = 0; rule; i++) {
1269                         snmp_rules[i].len = strlen(rule);
1270                         if (snmp_rules[i].len > IFNAMSIZ) {
1271                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1272                                 exit(1);
1273                         }
1274                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1275                         if (!*(rule - 1)) *(rule - 1) = ',';
1276                         rule = strtok(NULL, ",");
1277                         if (!rule) {
1278                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1279                                 exit(1);
1280                         }
1281                         snmp_rules[i].base = atoi(rule);
1282                         *(rule - 1) = ':';
1283                         rule = strtok(NULL, ":");
1284                 }
1285                 nsnmp_rules = i;
1286         }
1287         if (parms[tflag].count)
1288                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1289         if (parms[aflag].count) {
1290                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1291                 bad_lhost:
1292                         fprintf(stderr, "Illegal %s\n", "source address");
1293                         exit(1);
1294                 } else {
1295                         saddr = *((struct sockaddr_in *) res->ai_addr);
1296                         freeaddrinfo(res);
1297                 }
1298         }
1299         if (parms[uflag].count) 
1300                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1301                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1302                         exit(1);
1303                 }
1304
1305
1306         /* Process collectors parameters. Brrrr... :-[ */
1307
1308         npeers = argc - optind;
1309         if (npeers > 1) {
1310                 /* Send to remote Netflow collector */
1311                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1312                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1313                         dhost = argv[i];
1314                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1315                         *dport++ = 0;
1316                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1317                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1318                                 exit(1);
1319                         }
1320                         peers[npeers].write_fd = write_fd;
1321                         peers[npeers].type = PEER_MIRROR;
1322                         peers[npeers].laddr = saddr;
1323                         peers[npeers].seq = 0;
1324                         if ((lhost = strchr(dport, '/'))) {
1325                                 *lhost++ = 0;
1326                                 if ((type = strchr(lhost, '/'))) {
1327                                         *type++ = 0;
1328                                         switch (*type) {
1329                                                 case 0:
1330                                                 case 'm':
1331                                                         break;
1332
1333                                                 case 'r':
1334                                                         peers[npeers].type = PEER_ROTATE;
1335                                                         npeers_rot++;
1336                                                         break;
1337
1338                                                 default:
1339                                                         goto bad_collector;
1340                                         }
1341                                 }
1342                                 if (*lhost) {
1343                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1344                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1345                                         freeaddrinfo(res);
1346                                 }
1347                         }
1348                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1349                                                 sizeof(struct sockaddr_in))) {
1350                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1351                                 exit(1);
1352                         }
1353                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1354 bad_collector:
1355                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1356                                 exit(1);
1357                         }
1358                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1359                         freeaddrinfo(res);
1360                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1361                                                 sizeof(struct sockaddr_in))) {
1362                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1363                                 exit(1);
1364                         }
1365
1366                         /* Restore command line */
1367                         if (type) *--type = '/';
1368                         if (lhost) *--lhost = '/';
1369                         *--dport = ':';
1370                 }
1371         }
1372         else if (parms[fflag].count) {
1373                 // log into a file
1374                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1375                 if (!(peers[0].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1376                 strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
1377                 
1378                 peers[0].write_fd = -1;
1379                 peers[0].type = PEER_FILE;
1380                 peers[0].seq = 0;
1381                 npeers++;
1382         }
1383         else 
1384                 usage();
1385
1386
1387         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1388         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1389         if (!ulog_handle) {
1390                 fprintf(stderr, "libipulog initialization error: %s",
1391                         ipulog_strerror(ipulog_errno));
1392                 exit(1);
1393         }
1394         if (sockbufsize)
1395                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1396                         &sockbufsize, sizeof(sockbufsize)) < 0)
1397                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1398
1399         /* Daemonize (if log destination stdout-free) */
1400
1401         my_log_open(ident, verbosity, log_dest);
1402         if (!(log_dest & 2)) {
1403                 switch (fork()) {
1404                         case -1:
1405                                 fprintf(stderr, "fork(): %s", strerror(errno));
1406                                 exit(1);
1407
1408                         case 0:
1409                                 setsid();
1410                                 freopen("/dev/null", "r", stdin);
1411                                 freopen("/dev/null", "w", stdout);
1412                                 freopen("/dev/null", "w", stderr);
1413                                 break;
1414
1415                         default:
1416                                 exit(0);
1417                 }
1418         } else {
1419                 setvbuf(stdout, (char *)0, _IONBF, 0);
1420                 setvbuf(stderr, (char *)0, _IONBF, 0);
1421         }
1422
1423         pid = getpid();
1424         sprintf(errpbuf, "[%ld]", (long) pid);
1425         strcat(ident, errpbuf);
1426
1427         /* Initialization */
1428
1429         hash_init(); /* Actually for crc16 only */
1430         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1431         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1432
1433 #ifdef UPTIME_TRICK
1434         /* Hope 12 days is enough :-/ */
1435         start_time_offset = 1 << 20;
1436
1437         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1438 #endif
1439         gettime(&start_time);
1440
1441         /*
1442         Build static pending queue as circular buffer.
1443         */
1444         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1445         pending_tail = pending_head;
1446         for (i = pending_queue_length - 1; i--;) {
1447                 if (!(pending_tail->next = mem_alloc())) {
1448                 err_mem_alloc:
1449                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1450                         exit(1);
1451                 }
1452                 pending_tail = pending_tail->next;
1453         }
1454         pending_tail->next = pending_head;
1455         pending_tail = pending_head;
1456
1457         sigemptyset(&sig_mask);
1458         sigact.sa_handler = &sighandler;
1459         sigact.sa_mask = sig_mask;
1460         sigact.sa_flags = 0;
1461         sigaddset(&sig_mask, SIGTERM);
1462         sigaction(SIGTERM, &sigact, 0);
1463 #if ((DEBUG) & DEBUG_I)
1464         sigaddset(&sig_mask, SIGUSR1);
1465         sigaction(SIGUSR1, &sigact, 0);
1466 #endif
1467         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1468                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1469                 exit(1);
1470         }
1471
1472         my_log(LOG_INFO, "Starting %s...", VERSION);
1473
1474         if (parms[cflag].count) {
1475                 if (chdir(parms[cflag].arg) || chroot(".")) {
1476                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1477                         exit(1);
1478                 }
1479         }
1480
1481         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1482         pthread_attr_init(&tattr);
1483         for (i = 0; i < THREADS - 1; i++) {
1484                 if (schedp.sched_priority > 0) {
1485                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1486                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1487                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1488                                 exit(1);
1489                         }
1490                 }
1491                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1492                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1493                         exit(1);
1494                 }
1495                 pthread_detach(thid);
1496                 schedp.sched_priority++;
1497         }
1498
1499         if (pw) {
1500                 if (setgroups(0, NULL)) {
1501                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1502                         exit(1);
1503                 }
1504                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1505                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1506                         exit(1);
1507                 }
1508                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1509                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1510                         exit(1);
1511                 }
1512         }
1513
1514         if (!(pidfile = fopen(pidfilepath, "w")))
1515                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1516         else {
1517                 fprintf(pidfile, "%ld\n", (long) pid);
1518                 fclose(pidfile);
1519         }
1520
1521         my_log(LOG_INFO, "pid: %d", pid);
1522         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1523                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1524                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1525                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1526                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1527                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1528                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1529         for (i = 0; i < nsnmp_rules; i++) {
1530                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1531                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1532         }
1533         for (i = 0; i < npeers; i++) {
1534                 switch (peers[i].type) {
1535                         case PEER_MIRROR:
1536                                 c = 'm';
1537                                 break;
1538                         case PEER_ROTATE:
1539                                 c = 'r';
1540                                 break;
1541                 }
1542                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1543                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1544                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1545         }
1546
1547         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1548
1549         timeout.tv_usec = 0;
1550         while (!killed
1551                 || (total_elements - free_elements - pending_queue_length)
1552                 || emit_count
1553                 || pending_tail->flags) {
1554
1555                 if (!sigs) {
1556                         timeout.tv_sec = scan_interval;
1557                         select(0, 0, 0, 0, &timeout);
1558                 }
1559
1560                 if (sigs & SIGTERM_MASK && !killed) {
1561                         sigs &= ~SIGTERM_MASK;
1562                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1563                         scan_interval = 1;
1564                         frag_lifetime = -1;
1565                         active_lifetime = -1;
1566                         inactive_lifetime = -1;
1567                         emit_timeout = 1;
1568                         unpending_timeout = 1;
1569                         killed = 1;
1570                         pthread_cond_signal(&scan_cond);
1571                         pthread_cond_signal(&unpending_cond);
1572                 }
1573
1574 #if ((DEBUG) & DEBUG_I)
1575                 if (sigs & SIGUSR1_MASK) {
1576                         sigs &= ~SIGUSR1_MASK;
1577                         info_debug();
1578                 }
1579 #endif
1580         }
1581         remove(pidfilepath);
1582 #if ((DEBUG) & DEBUG_I)
1583         info_debug();
1584 #endif
1585         my_log(LOG_INFO, "Done.");
1586 #ifdef WALL
1587         return 0;
1588 #endif
1589 }