Keep track of current log file in case fprobe is restarted
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9         7/11/2007       Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11                 Added data collection (-f) functionality, xid support in the header and log file
12                 rotation.
13 */
14
15 #include <common.h>
16
17 /* stdout, stderr, freopen() */
18 #include <stdio.h>
19
20 /* atoi(), exit() */
21 #include <stdlib.h>
22
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
24 #include <unistd.h>
25
26 /* strerror() */
27 #include <string.h>
28
29 /* sig*() */
30 #include <signal.h>
31
32 #include <libipulog/libipulog.h>
33 struct ipulog_handle {
34         int fd;
35         u_int8_t blocking;
36         struct sockaddr_nl local;
37         struct sockaddr_nl peer;
38         struct nlmsghdr* last_nlhdr;
39 };
40
41 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
42 #include <sys/types.h>
43 #include <netinet/in_systm.h>
44 #include <sys/socket.h>
45 #include <netinet/in.h>
46 #include <arpa/inet.h>
47 #include <netinet/ip.h>
48 #include <netinet/tcp.h>
49 #include <netinet/udp.h>
50 #include <netinet/ip_icmp.h>
51 #include <net/if.h>
52
53 #include <sys/param.h>
54 #include <pwd.h>
55 #ifdef OS_LINUX
56 #include <grp.h>
57 #endif
58
59 /* pthread_*() */
60 #include <pthread.h>
61
62 /* errno */
63 #include <errno.h>
64
65 /* getaddrinfo() */
66 #include <netdb.h>
67
68 /* nanosleep() */
69 #include <time.h>
70
71 /* gettimeofday() */
72 #include <sys/time.h>
73
74 /* scheduling */
75 #include <sched.h>
76
77 /* select() (POSIX)*/
78 #include <sys/select.h>
79
80 /* open() */
81 #include <sys/stat.h>
82 #include <fcntl.h>
83
84 #include <fprobe-ulog.h>
85 #include <my_log.h>
86 #include <my_getopt.h>
87 #include <netflow.h>
88 #include <hash.h>
89 #include <mem.h>
90
91 enum {
92         aflag,
93         Bflag,
94         bflag,
95         cflag,
96         dflag,
97         eflag,
98         Eflag,
99         fflag,
100         gflag,
101         hflag,
102         lflag,
103         mflag,
104         Mflag,
105         nflag,
106         qflag,
107         rflag,
108         sflag,
109         tflag,
110         Tflag,
111         Uflag,
112         uflag,
113         vflag,
114         Wflag,
115         Xflag,
116 };
117
118 static struct getopt_parms parms[] = {
119         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
120         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
121         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
122         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
123         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
124         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'h', 0, 0, 0},
129         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'M', 0, 0, 0},
132         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
135         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
138         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {0, 0, 0, 0}
143 };
144
145 extern char *optarg;
146 extern int optind, opterr, optopt;
147 extern int errno;
148
149 extern struct NetFlow NetFlow1;
150 extern struct NetFlow NetFlow5;
151 extern struct NetFlow NetFlow7;
152
153 #define mark_is_tos parms[Mflag].count
154 static unsigned scan_interval = 5;
155 static int frag_lifetime = 30;
156 static int inactive_lifetime = 60;
157 static int active_lifetime = 300;
158 static int sockbufsize;
159 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
160 #if (MEM_BITS == 0) || (MEM_BITS == 16)
161 #define BULK_QUANTITY 10000
162 #else
163 #define BULK_QUANTITY 200
164 #endif
165
166 static unsigned epoch_length=60, log_epochs=1; 
167 static unsigned cur_epoch=0,prev_uptime=0;
168
169 static unsigned bulk_quantity = BULK_QUANTITY;
170 static unsigned pending_queue_length = 100;
171 static struct NetFlow *netflow = &NetFlow5;
172 static unsigned verbosity = 6;
173 static unsigned log_dest = MY_LOG_SYSLOG;
174 static struct Time start_time;
175 static long start_time_offset;
176 static int off_tl;
177 /* From mem.c */
178 extern unsigned total_elements;
179 extern unsigned free_elements;
180 extern unsigned total_memory;
181 #if ((DEBUG) & DEBUG_I)
182 static unsigned emit_pkts, emit_queue;
183 static uint64_t size_total;
184 static unsigned pkts_total, pkts_total_fragmented;
185 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
186 static unsigned pkts_pending, pkts_pending_done;
187 static unsigned pending_queue_trace, pending_queue_trace_candidate;
188 static unsigned flows_total, flows_fragmented;
189 #endif
190 static unsigned emit_count;
191 static uint32_t emit_sequence;
192 static unsigned emit_rate_bytes, emit_rate_delay;
193 static struct Time emit_time;
194 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
195 static pthread_t thid;
196 static sigset_t sig_mask;
197 static struct sched_param schedp;
198 static int sched_min, sched_max;
199 static int npeers, npeers_rot;
200 static struct peer *peers;
201 static int sigs;
202
203 static struct Flow *flows[1 << HASH_BITS];
204 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
205
206 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
207 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
208
209 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
210 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
211 static struct Flow *pending_head, *pending_tail;
212 static struct Flow *scan_frag_dreg;
213
214 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
215 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
216 static struct Flow *flows_emit;
217
218 static char ident[256] = "fprobe-ulog";
219 static FILE *pidfile;
220 static char *pidfilepath;
221 static pid_t pid;
222 static int killed;
223 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
224 static struct ipulog_handle *ulog_handle;
225 static uint32_t ulog_gmask = 1;
226 static char *cap_buf;
227 static int nsnmp_rules;
228 static struct snmp_rule *snmp_rules;
229 static struct passwd *pw = 0;
230
231 void usage()
232 {
233         fprintf(stdout,
234                 "fprobe-ulog: a NetFlow probe. Version %s\n"
235                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
236                 "\n"
237                 "-h\t\tDisplay this help\n"
238                 "-U <mask>\tULOG group bitwise mask [1]\n"
239                 "-s <seconds>\tHow often scan for expired flows [5]\n"
240                 "-g <seconds>\tFragmented flow lifetime [30]\n"
241                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
242                 "-f <filename>\tLog flow data in a file\n"
243                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
244                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
245                 "-a <address>\tUse <address> as source for NetFlow flow\n"
246                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
247                 "-M\t\tUse netfilter mark value as ToS flag\n"
248                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
249                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
250                 "-q <flows>\tPending queue length [100]\n"
251                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
252                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
253                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
254                 "-c <directory>\tDirectory to chroot to\n"
255                 "-u <user>\tUser to run as\n"
256                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
257                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
258                 "-y <remote:port>\tAddress of the NetFlow collector\n"
259                 "-f <writable file>\tFile to write data into\n"
260                 "-T <n>\tRotate log file every n epochs\n"
261                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
262                 "-E <[1..60]>\tSize of an epoch in minutes\n"
263                 ,
264                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
265         exit(0);
266 }
267
268 #if ((DEBUG) & DEBUG_I)
269 void info_debug()
270 {
271         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
272                 pkts_total, pkts_total_fragmented, size_total,
273                 pkts_pending - pkts_pending_done, pending_queue_trace);
274         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
275                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
276         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
277                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
278         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
279                 total_elements, free_elements, total_memory);
280 }
281 #endif
282
283 void sighandler(int sig)
284 {
285         switch (sig) {
286                 case SIGTERM:
287                         sigs |= SIGTERM_MASK;
288                         break;
289 #if ((DEBUG) & DEBUG_I)
290                 case SIGUSR1:
291                         sigs |= SIGUSR1_MASK;
292                         break;
293 #endif
294         }
295 }
296
297 void gettime(struct Time *now)
298 {
299         struct timeval t;
300
301         gettimeofday(&t, 0);
302         now->sec = t.tv_sec;
303         now->usec = t.tv_usec;
304 }
305
306
307 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
308 {
309         return (t1->sec - t2->sec)/60;
310 }
311
312 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
313 {
314         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
315 }
316
317 /* Uptime in miliseconds */
318 uint32_t getuptime(struct Time *t)
319 {
320         /* Maximum uptime is about 49/2 days */
321         return cmpmtime(t, &start_time);
322 }
323
324 /* Uptime in minutes */
325 uint32_t getuptime_minutes(struct Time *t)
326 {
327         /* Maximum uptime is about 49/2 days */
328         return cmpMtime(t, &start_time);
329 }
330
331 hash_t hash_flow(struct Flow *flow)
332 {
333         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
334         else return hash(flow, sizeof(struct Flow_TL));
335 }
336
337 uint16_t snmp_index(char *name) {
338         uint32_t i;
339
340         if (!*name) return 0;
341
342         for (i = 0; (int) i < nsnmp_rules; i++) {
343                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
344                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
345         }
346
347         if ((i = if_nametoindex(name))) return i;
348
349         return -1;
350 }
351
352 inline void copy_flow(struct Flow *src, struct Flow *dst)
353 {
354         dst->iif = src->iif;
355         dst->oif = src->oif;
356         dst->sip = src->sip;
357         dst->dip = src->dip;
358         dst->tos = src->tos;
359         dst->proto = src->proto;
360         dst->tcp_flags = src->tcp_flags;
361         dst->id = src->id;
362         dst->sp = src->sp;
363         dst->dp = src->dp;
364         dst->pkts = src->pkts;
365         dst->size = src->size;
366         dst->sizeF = src->sizeF;
367         dst->sizeP = src->sizeP;
368         dst->ctime = src->ctime;
369         dst->mtime = src->mtime;
370         dst->flags = src->flags;
371 }
372
373 void update_cur_epoch_file(int n) {
374         int fd, len;
375         char snum[7];
376         len=snprintf(snum,6,"%d",n);
377         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT);
378         if (fd == -1) {
379                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch");
380                 return;
381         }
382         write(fd, snum, len);
383         close(fd);
384 }
385
386 unsigned get_log_fd(char *fname, unsigned cur_fd) {
387         struct Time now;
388         unsigned cur_uptime;
389         int ret_fd;
390         gettime(&now);
391         cur_uptime = getuptime_minutes(&now);
392
393         /* Epoch length in minutes */
394         if ((cur_uptime - prev_uptime) > epoch_length || cur_fd==-1) {
395                 char nextname[MAX_PATH_LEN];
396                 int write_fd;
397                 prev_uptime = cur_uptime;
398                 cur_epoch = (cur_epoch + 1) % log_epochs;
399                 close(cur_fd);
400                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
401                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
402                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
403                         exit(1);
404                 }
405                 update_cur_epoch_file(cur_epoch);
406                 ret_fd = write_fd;
407         }
408         else
409                 ret_fd = cur_fd;
410         return(ret_fd);
411 }
412
413 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
414 {
415         struct Flow **flowpp;
416
417 #ifdef WALL
418         flowpp = 0;
419 #endif
420
421         if (prev) flowpp = *prev;
422
423         while (where) {
424                 if (where->sip.s_addr == what->sip.s_addr
425                         && where->dip.s_addr == what->dip.s_addr
426                         && where->proto == what->proto) {
427                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
428                                 case 0:
429                                         /* Both unfragmented */
430                                         if ((what->sp == where->sp)
431                                                 && (what->dp == where->dp)) goto done;
432                                         break;
433                                 case 2:
434                                         /* Both fragmented */
435                                         if (where->id == what->id) goto done;
436                                         break;
437                         }
438                 }
439                 flowpp = &where->next;
440                 where = where->next;
441         }
442 done:
443         if (prev) *prev = flowpp;
444         return where;
445 }
446
447 int put_into(struct Flow *flow, int flag
448 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
449         , char *logbuf
450 #endif
451 )
452 {
453         int ret = 0;
454         hash_t h;
455         struct Flow *flown, **flowpp;
456 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
457         char buf[64];
458 #endif
459
460         h = hash_flow(flow);
461 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
462         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
463         strcat(logbuf, buf);
464 #endif
465         pthread_mutex_lock(&flows_mutex[h]);
466         flowpp = &flows[h];
467         if (!(flown = find(flows[h], flow, &flowpp))) {
468                 /* No suitable flow found - add */
469                 if (flag == COPY_INTO) {
470                         if ((flown = mem_alloc())) {
471                                 copy_flow(flow, flown);
472                                 flow = flown;
473                         } else {
474 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
475                                 my_log(LOG_ERR, "%s %s. %s",
476                                         "mem_alloc():", strerror(errno), "packet lost");
477 #endif
478                                 return -1;
479                         }
480                 }
481                 flow->next = flows[h];
482                 flows[h] = flow;
483 #if ((DEBUG) & DEBUG_I)
484                 flows_total++;
485                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
486 #endif
487 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
488                 if (flown) {
489                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
490                         strcat(logbuf, buf);
491                 }
492 #endif
493         } else {
494                 /* Found suitable flow - update */
495 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
496                 sprintf(buf, " +> %x", (unsigned) flown);
497                 strcat(logbuf, buf);
498 #endif
499                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
500                         flown->mtime = flow->mtime;
501                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
502                         flown->ctime = flow->ctime;
503                 flown->tcp_flags |= flow->tcp_flags;
504                 flown->size += flow->size;
505                 flown->pkts += flow->pkts;
506                 if (flow->flags & FLOW_FRAG) {
507                         /* Fragmented flow require some additional work */
508                         if (flow->flags & FLOW_TL) {
509                                 /*
510                                 ?FIXME?
511                                 Several packets with FLOW_TL (attack)
512                                 */
513                                 flown->sp = flow->sp;
514                                 flown->dp = flow->dp;
515                         }
516                         if (flow->flags & FLOW_LASTFRAG) {
517                                 /*
518                                 ?FIXME?
519                                 Several packets with FLOW_LASTFRAG (attack)
520                                 */
521                                 flown->sizeP = flow->sizeP;
522                         }
523                         flown->flags |= flow->flags;
524                         flown->sizeF += flow->sizeF;
525                         if ((flown->flags & FLOW_LASTFRAG)
526                                 && (flown->sizeF >= flown->sizeP)) {
527                                 /* All fragments received - flow reassembled */
528                                 *flowpp = flown->next;
529                                 pthread_mutex_unlock(&flows_mutex[h]);
530 #if ((DEBUG) & DEBUG_I)
531                                 flows_total--;
532                                 flows_fragmented--;
533 #endif
534                                 flown->id = 0;
535                                 flown->flags &= ~FLOW_FRAG;
536 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
537                                 strcat(logbuf," R");
538 #endif
539                                 ret = put_into(flown, MOVE_INTO
540 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
541                                                 , logbuf
542 #endif
543                                         );
544                         }
545                 }
546                 if (flag == MOVE_INTO) mem_free(flow);
547         }
548         pthread_mutex_unlock(&flows_mutex[h]);
549         return ret;
550 }
551
552 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
553 {
554         int i;
555
556         for (i = 0; i < fields; i++) {
557 #if ((DEBUG) & DEBUG_F)
558                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
559 #endif
560                 switch (format[i]) {
561                         case NETFLOW_IPV4_SRC_ADDR:
562                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
563                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
564                                 break;
565
566                         case NETFLOW_IPV4_DST_ADDR:
567                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
568                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
569                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
570                                 }
571                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
572                                 break;
573
574                         case NETFLOW_INPUT_SNMP:
575                                 *((uint16_t *) p) = htons(flow->iif);
576                                 p += NETFLOW_INPUT_SNMP_SIZE;
577                                 break;
578
579                         case NETFLOW_OUTPUT_SNMP:
580                                 *((uint16_t *) p) = htons(flow->oif);
581                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
582                                 break;
583
584                         case NETFLOW_PKTS_32:
585                                 *((uint32_t *) p) = htonl(flow->pkts);
586                                 p += NETFLOW_PKTS_32_SIZE;
587                                 break;
588
589                         case NETFLOW_BYTES_32:
590                                 *((uint32_t *) p) = htonl(flow->size);
591                                 p += NETFLOW_BYTES_32_SIZE;
592                                 break;
593
594                         case NETFLOW_FIRST_SWITCHED:
595                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
596                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
597                                 break;
598
599                         case NETFLOW_LAST_SWITCHED:
600                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
601                                 p += NETFLOW_LAST_SWITCHED_SIZE;
602                                 break;
603
604                         case NETFLOW_L4_SRC_PORT:
605                                 *((uint16_t *) p) = flow->sp;
606                                 p += NETFLOW_L4_SRC_PORT_SIZE;
607                                 break;
608
609                         case NETFLOW_L4_DST_PORT:
610                                 *((uint16_t *) p) = flow->dp;
611                                 p += NETFLOW_L4_DST_PORT_SIZE;
612                                 break;
613
614                         case NETFLOW_PROT:
615                                 *((uint8_t *) p) = flow->proto;
616                                 p += NETFLOW_PROT_SIZE;
617                                 break;
618
619                         case NETFLOW_SRC_TOS:
620                                 *((uint8_t *) p) = flow->tos;
621                                 p += NETFLOW_SRC_TOS_SIZE;
622                                 break;
623
624                         case NETFLOW_TCP_FLAGS:
625                                 *((uint8_t *) p) = flow->tcp_flags;
626                                 p += NETFLOW_TCP_FLAGS_SIZE;
627                                 break;
628
629                         case NETFLOW_VERSION:
630                                 *((uint16_t *) p) = htons(netflow->Version);
631                                 p += NETFLOW_VERSION_SIZE;
632                                 break;
633
634                         case NETFLOW_COUNT:
635                                 *((uint16_t *) p) = htons(emit_count);
636                                 p += NETFLOW_COUNT_SIZE;
637                                 break;
638
639                         case NETFLOW_UPTIME:
640                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
641                                 p += NETFLOW_UPTIME_SIZE;
642                                 break;
643
644                         case NETFLOW_UNIX_SECS:
645                                 *((uint32_t *) p) = htonl(emit_time.sec);
646                                 p += NETFLOW_UNIX_SECS_SIZE;
647                                 break;
648
649                         case NETFLOW_UNIX_NSECS:
650                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
651                                 p += NETFLOW_UNIX_NSECS_SIZE;
652                                 break;
653
654                         case NETFLOW_FLOW_SEQUENCE:
655                                 //*((uint32_t *) p) = htonl(emit_sequence);
656                                 *((uint32_t *) p) = 0;
657                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
658                                 break;
659
660                         case NETFLOW_PAD8:
661                         /* Unsupported (uint8_t) */
662                         case NETFLOW_ENGINE_TYPE:
663                         case NETFLOW_ENGINE_ID:
664                         case NETFLOW_FLAGS7_1:
665                         case NETFLOW_SRC_MASK:
666                         case NETFLOW_DST_MASK:
667                                 *((uint8_t *) p) = 0;
668                                 p += NETFLOW_PAD8_SIZE;
669                                 break;
670                         case NETFLOW_XID:
671                                 *((uint16_t *) p) = flow->tos;
672                                 p += NETFLOW_XID_SIZE;
673                                 break;
674                         case NETFLOW_PAD16:
675                         /* Unsupported (uint16_t) */
676                         case NETFLOW_SRC_AS:
677                         case NETFLOW_DST_AS:
678                         case NETFLOW_FLAGS7_2:
679                                 *((uint16_t *) p) = 0;
680                                 p += NETFLOW_PAD16_SIZE;
681                                 break;
682
683                         case NETFLOW_PAD32:
684                         /* Unsupported (uint32_t) */
685                         case NETFLOW_IPV4_NEXT_HOP:
686                         case NETFLOW_ROUTER_SC:
687                                 *((uint32_t *) p) = 0;
688                                 p += NETFLOW_PAD32_SIZE;
689                                 break;
690
691                         default:
692                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
693                                         format, i, format[i]);
694                                 exit(1);
695                 }
696         }
697 #if ((DEBUG) & DEBUG_F)
698         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
699 #endif
700         return p;
701 }
702
703 void setuser() {
704         /*
705         Workaround for clone()-based threads
706         Try to change EUID independently of main thread
707         */
708         if (pw) {
709                 setgroups(0, NULL);
710                 setregid(pw->pw_gid, pw->pw_gid);
711                 setreuid(pw->pw_uid, pw->pw_uid);
712         }
713 }
714
715 void *emit_thread()
716 {
717         struct Flow *flow;
718         void *p;
719         struct timeval now;
720         struct timespec timeout;
721         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
722
723         p = (void *) &emit_packet + netflow->HeaderSize;
724         timeout.tv_nsec = 0;
725
726         setuser();
727
728         for (;;) {
729                 pthread_mutex_lock(&emit_mutex);
730                 while (!flows_emit) {
731                         gettimeofday(&now, 0);
732                         timeout.tv_sec = now.tv_sec + emit_timeout;
733                         /* Do not wait until emit_packet will filled - it may be too long */
734                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
735                                 pthread_mutex_unlock(&emit_mutex);
736                                 goto sendit;
737                         }
738                 }
739                 flow = flows_emit;
740                 flows_emit = flows_emit->next;
741 #if ((DEBUG) & DEBUG_I)
742                 emit_queue--;
743 #endif          
744                 pthread_mutex_unlock(&emit_mutex);
745
746 #ifdef UPTIME_TRICK
747                 if (!emit_count) {
748                         gettime(&start_time);
749                         start_time.sec -= start_time_offset;
750                 }
751 #endif
752                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
753                 mem_free(flow);
754                 emit_count++;
755 #ifdef PF2_DEBUG
756                 printf("Emit count = %d\n", emit_count);
757                 fflush(stdout);
758 #endif
759                 if (emit_count == netflow->MaxFlows) {
760                 sendit:
761                         gettime(&emit_time);
762                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
763                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
764                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
765                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
766                         peer_rot_cur = 0;
767                         for (i = 0; i < npeers; i++) {
768                                 if (peers[i].type == PEER_FILE) {
769                                                 if (netflow->SeqOffset)
770                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
771                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
772                                                 ret = write(peers[0].write_fd, emit_packet, size);
773                                                 if (ret < size) {
774
775 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
776                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
777                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
778 #endif
779 #undef MESSAGES
780                                                 }
781 #if ((DEBUG) & DEBUG_E)
782                                                 commaneelse {
783                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
784                                                                 emit_count, i + 1, peers[i].seq);
785                                                 }
786 #endif
787                                                 peers[0].seq += emit_count;
788
789                                                 /* Rate limit */
790                                                 if (emit_rate_bytes) {
791                                                         sent += size;
792                                                         delay = sent / emit_rate_bytes;
793                                                         if (delay) {
794                                                                 sent %= emit_rate_bytes;
795                                                                 timeout.tv_sec = 0;
796                                                                 timeout.tv_nsec = emit_rate_delay * delay;
797                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
798                                                         }
799                                                 }
800                                         }
801                                 else
802                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
803                                 else
804                                 if (peers[i].type == PEER_ROTATE) 
805                                         if (peer_rot_cur++ == peer_rot_work) {
806                                         sendreal:
807                                                 if (netflow->SeqOffset)
808                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
809                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
810                                                 if (ret < size) {
811 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
812                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
813                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
814 #endif
815                                                 }
816 #if ((DEBUG) & DEBUG_E)
817                                                 commaneelse {
818                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
819                                                                 emit_count, i + 1, peers[i].seq);
820                                                 }
821 #endif
822                                                 peers[i].seq += emit_count;
823
824                                                 /* Rate limit */
825                                                 if (emit_rate_bytes) {
826                                                         sent += size;
827                                                         delay = sent / emit_rate_bytes;
828                                                         if (delay) {
829                                                                 sent %= emit_rate_bytes;
830                                                                 timeout.tv_sec = 0;
831                                                                 timeout.tv_nsec = emit_rate_delay * delay;
832                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
833                                                         }
834                                                 }
835                                         }
836                         }
837                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
838                         emit_sequence += emit_count;
839                         emit_count = 0;
840 #if ((DEBUG) & DEBUG_I)
841                         emit_pkts++;
842 #endif
843                 }
844         }
845 }       
846
847 void *unpending_thread()
848 {
849         struct timeval now;
850         struct timespec timeout;
851 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
852         char logbuf[256];
853 #endif
854
855         setuser();
856
857         timeout.tv_nsec = 0;
858         pthread_mutex_lock(&unpending_mutex);
859
860         for (;;) {
861                 while (!(pending_tail->flags & FLOW_PENDING)) {
862                         gettimeofday(&now, 0);
863                         timeout.tv_sec = now.tv_sec + unpending_timeout;
864                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
865                 }
866
867 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
868                 *logbuf = 0;
869 #endif
870                 if (put_into(pending_tail, COPY_INTO
871 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
872                                 , logbuf
873 #endif
874                         ) < 0) {
875 #if ((DEBUG) & DEBUG_I)
876                         pkts_lost_unpending++;
877 #endif                          
878                 }
879
880 #if ((DEBUG) & DEBUG_U)
881                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
882 #endif
883
884                 pending_tail->flags = 0;
885                 pending_tail = pending_tail->next;
886 #if ((DEBUG) & DEBUG_I)
887                 pkts_pending_done++;
888 #endif
889         }
890 }
891
892 void *scan_thread()
893 {
894 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
895         char logbuf[256];
896 #endif
897         int i;
898         struct Flow *flow, **flowpp;
899         struct Time now;
900         struct timespec timeout;
901
902         setuser();
903
904         timeout.tv_nsec = 0;
905         pthread_mutex_lock(&scan_mutex);
906
907         for (;;) {
908                 gettime(&now);
909                 timeout.tv_sec = now.sec + scan_interval;
910                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
911
912                 gettime(&now);
913 #if ((DEBUG) & DEBUG_S)
914                 my_log(LOG_DEBUG, "S: %d", now.sec);
915 #endif
916                 for (i = 0; i < 1 << HASH_BITS ; i++) {
917                         pthread_mutex_lock(&flows_mutex[i]);
918                         flow = flows[i];
919                         flowpp = &flows[i];
920                         while (flow) {
921                                 if (flow->flags & FLOW_FRAG) {
922                                         /* Process fragmented flow */
923                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
924                                                 /* Fragmented flow expired - put it into special chain */
925 #if ((DEBUG) & DEBUG_I)
926                                                 flows_fragmented--;
927                                                 flows_total--;
928 #endif
929                                                 *flowpp = flow->next;
930                                                 flow->id = 0;
931                                                 flow->flags &= ~FLOW_FRAG;
932                                                 flow->next = scan_frag_dreg;
933                                                 scan_frag_dreg = flow;
934                                                 flow = *flowpp;
935                                                 continue;
936                                         }
937                                 } else {
938                                         /* Flow is not frgamented */
939                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
940                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
941                                                 /* Flow expired */
942 #if ((DEBUG) & DEBUG_S)
943                                                 my_log(LOG_DEBUG, "S: E %x", flow);
944 #endif
945 #if ((DEBUG) & DEBUG_I)
946                                                 flows_total--;
947 #endif
948                                                 *flowpp = flow->next;
949                                                 pthread_mutex_lock(&emit_mutex);
950                                                 flow->next = flows_emit;
951                                                 flows_emit = flow;
952 #if ((DEBUG) & DEBUG_I)
953                                                 emit_queue++;
954 #endif                          
955                                                 pthread_mutex_unlock(&emit_mutex);
956                                                 flow = *flowpp;
957                                                 continue;
958                                         }
959                                 }
960                                 flowpp = &flow->next;
961                                 flow = flow->next;
962                         } /* chain loop */
963                         pthread_mutex_unlock(&flows_mutex[i]);
964                 } /* hash loop */
965                 if (flows_emit) pthread_cond_signal(&emit_cond);
966
967                 while (scan_frag_dreg) {
968                         flow = scan_frag_dreg;
969                         scan_frag_dreg = flow->next;
970 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
971                         *logbuf = 0;
972 #endif
973                         put_into(flow, MOVE_INTO
974 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
975                                 , logbuf
976 #endif
977                         );
978 #if ((DEBUG) & DEBUG_S)
979                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
980 #endif
981                 }
982         }
983 }
984
985 void *cap_thread()
986 {
987         struct ulog_packet_msg *ulog_msg;
988         struct ip *nl;
989         void *tl;
990         struct Flow *flow;
991         int len, off_frag, psize;
992 #if ((DEBUG) & DEBUG_C)
993         char buf[64];
994         char logbuf[256];
995 #endif
996
997         setuser();
998
999         while (!killed) {
1000                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1001                 if (len <= 0) {
1002                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1003                         continue;
1004                 }
1005                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1006
1007 #if ((DEBUG) & DEBUG_C)
1008                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1009 #endif
1010
1011                         nl = (void *) &ulog_msg->payload;
1012                         psize = ulog_msg->data_len;
1013
1014                         /* Sanity check */
1015                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1016 #if ((DEBUG) & DEBUG_C)
1017                                 strcat(logbuf, " U");
1018                                 my_log(LOG_DEBUG, "%s", logbuf);
1019 #endif
1020 #if ((DEBUG) & DEBUG_I)
1021                                 pkts_ignored++;
1022 #endif
1023                                 continue;
1024                         }
1025
1026                         if (pending_head->flags) {
1027 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1028                                 my_log(LOG_ERR,
1029 # if ((DEBUG) & DEBUG_C)
1030                                         "%s %s %s", logbuf,
1031 # else
1032                                         "%s %s",
1033 # endif
1034                                         "pending queue full:", "packet lost");
1035 #endif
1036 #if ((DEBUG) & DEBUG_I)
1037                                 pkts_lost_capture++;
1038 #endif
1039                                 goto done;
1040                         }
1041
1042 #if ((DEBUG) & DEBUG_I)
1043                         pkts_total++;
1044 #endif
1045
1046                         flow = pending_head;
1047
1048                         /* ?FIXME? Add sanity check for ip_len? */
1049                         flow->size = ntohs(nl->ip_len);
1050 #if ((DEBUG) & DEBUG_I)
1051                         size_total += flow->size;
1052 #endif
1053
1054                         flow->sip = nl->ip_src;
1055                         flow->dip = nl->ip_dst;
1056                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1057                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1058                         }
1059                         flow->iif = snmp_index(ulog_msg->indev_name);
1060                         flow->oif = snmp_index(ulog_msg->outdev_name);
1061                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1062                         flow->proto = nl->ip_p;
1063                         flow->id = 0;
1064                         flow->tcp_flags = 0;
1065                         flow->pkts = 1;
1066                         flow->sizeF = 0;
1067                         flow->sizeP = 0;
1068                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1069                         if (ulog_msg->timestamp_sec) {
1070                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1071                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1072                         } else gettime(&flow->ctime);
1073                         flow->mtime = flow->ctime;
1074
1075                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1076
1077                         /*
1078                         Offset (from network layer) to transport layer header/IP data
1079                         IOW IP header size ;-)
1080
1081                         ?FIXME?
1082                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1083                         */
1084                         off_tl = nl->ip_hl << 2;
1085                         tl = (void *) nl + off_tl;
1086
1087                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1088                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1089                         psize -= off_tl;
1090                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1091                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1092
1093                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1094                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1095 #if ((DEBUG) & DEBUG_C)
1096                                 strcat(logbuf, " F");
1097 #endif
1098 #if ((DEBUG) & DEBUG_I)
1099                                 pkts_total_fragmented++;
1100 #endif
1101                                 flow->flags |= FLOW_FRAG;
1102                                 flow->id = nl->ip_id;
1103
1104                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1105                                         /* Packet whith IP_MF contains information about whole datagram size */
1106                                         flow->flags |= FLOW_LASTFRAG;
1107                                         /* size = frag_offset*8 + data_size */
1108                                         flow->sizeP = off_frag + flow->sizeF;
1109                                 }
1110                         }
1111
1112 #if ((DEBUG) & DEBUG_C)
1113                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1114                         strcat(logbuf, buf);
1115                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1116                         strcat(logbuf, buf);
1117 #endif
1118
1119                         /*
1120                         Fortunately most interesting transport layer information fit
1121                         into first 8 bytes of IP data field (minimal nonzero size).
1122                         Thus we don't need actual packet reassembling to build whole
1123                         transport layer data. We only check the fragment offset for
1124                         zero value to find packet with this information.
1125                         */
1126                         if (!off_frag && psize >= 8) {
1127                                 switch (flow->proto) {
1128                                         case IPPROTO_TCP:
1129                                         case IPPROTO_UDP:
1130                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1131                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1132                                                 goto tl_known;
1133
1134 #ifdef ICMP_TRICK
1135                                         case IPPROTO_ICMP:
1136                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1137                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1138                                                 goto tl_known;
1139 #endif
1140 #ifdef ICMP_TRICK_CISCO
1141                                         case IPPROTO_ICMP:
1142                                                 flow->dp = *((int32_t *) tl);
1143                                                 goto tl_known;
1144 #endif
1145
1146                                         default:
1147                                                 /* Unknown transport layer */
1148 #if ((DEBUG) & DEBUG_C)
1149                                                 strcat(logbuf, " U");
1150 #endif
1151                                                 flow->sp = 0;
1152                                                 flow->dp = 0;
1153                                                 break;
1154
1155                                         tl_known:
1156 #if ((DEBUG) & DEBUG_C)
1157                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1158                                                 strcat(logbuf, buf);
1159 #endif
1160                                                 flow->flags |= FLOW_TL;
1161                                 }
1162                         }
1163
1164                         /* Check for tcp flags presence (including CWR and ECE). */
1165                         if (flow->proto == IPPROTO_TCP
1166                                 && off_frag < 16
1167                                 && psize >= 16 - off_frag) {
1168                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1169 #if ((DEBUG) & DEBUG_C)
1170                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1171                                 strcat(logbuf, buf);
1172 #endif
1173                         }
1174
1175 #if ((DEBUG) & DEBUG_C)
1176                         sprintf(buf, " => %x", (unsigned) flow);
1177                         strcat(logbuf, buf);
1178                         my_log(LOG_DEBUG, "%s", logbuf);
1179 #endif
1180
1181 #if ((DEBUG) & DEBUG_I)
1182                         pkts_pending++;
1183                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1184                         if (pending_queue_trace < pending_queue_trace_candidate)
1185                                 pending_queue_trace = pending_queue_trace_candidate;
1186 #endif
1187
1188                         /* Flow complete - inform unpending_thread() about it */
1189                         pending_head->flags |= FLOW_PENDING;
1190                         pending_head = pending_head->next;
1191                 done:
1192                         pthread_cond_signal(&unpending_cond);
1193                 }
1194         }
1195         return 0;
1196 }
1197
1198 int main(int argc, char **argv)
1199 {
1200         char errpbuf[512];
1201         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1202         int c, i, write_fd, memory_limit = 0;
1203         struct addrinfo hints, *res;
1204         struct sockaddr_in saddr;
1205         pthread_attr_t tattr;
1206         struct sigaction sigact;
1207         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1208         struct timeval timeout;
1209
1210         sched_min = sched_get_priority_min(SCHED);
1211         sched_max = sched_get_priority_max(SCHED);
1212
1213         memset(&saddr, 0 , sizeof(saddr));
1214         memset(&hints, 0 , sizeof(hints));
1215         hints.ai_flags = AI_PASSIVE;
1216         hints.ai_family = AF_INET;
1217         hints.ai_socktype = SOCK_DGRAM;
1218
1219         /* Process command line options */
1220
1221         opterr = 0;
1222         while ((c = my_getopt(argc, argv, parms)) != -1) {
1223                 switch (c) {
1224                         case '?':
1225                                 usage();
1226
1227                         case 'h':
1228                                 usage();
1229                 }
1230         }
1231
1232         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1233         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1234         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1235         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1236         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1237         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1238         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1239         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1240         if (parms[nflag].count) {
1241                 switch (atoi(parms[nflag].arg)) {
1242                         case 1:
1243                                 netflow = &NetFlow1;
1244                                 break;
1245
1246                         case 5:
1247                                 break;
1248
1249                         case 7:
1250                                 netflow = &NetFlow7;
1251                                 break;
1252
1253                         default:
1254                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1255                                 exit(1);
1256                 }
1257         }
1258         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1259         if (parms[lflag].count) {
1260                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1261                         *log_suffix++ = 0;
1262                         if (*log_suffix) {
1263                                 sprintf(errpbuf, "[%s]", log_suffix);
1264                                 strcat(ident, errpbuf);
1265                         }
1266                 }
1267                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1268                 if (log_suffix) *--log_suffix = ':';
1269         }
1270         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1271         err_malloc:
1272                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1273                 exit(1);
1274         }
1275         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1276         if (parms[qflag].count) {
1277                 pending_queue_length = atoi(parms[qflag].arg);
1278                 if (pending_queue_length < 1) {
1279                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1280                         exit(1);
1281                 }
1282         }
1283         if (parms[rflag].count) {
1284                 schedp.sched_priority = atoi(parms[rflag].arg);
1285                 if (schedp.sched_priority
1286                         && (schedp.sched_priority < sched_min
1287                                 || schedp.sched_priority > sched_max)) {
1288                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1289                         exit(1);
1290                 }
1291         }
1292         if (parms[Bflag].count) {
1293                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1294         }
1295         if (parms[bflag].count) {
1296                 bulk_quantity = atoi(parms[bflag].arg);
1297                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1298                         fprintf(stderr, "Illegal %s\n", "bulk size");
1299                         exit(1);
1300                 }
1301         }
1302         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1303         if (parms[Xflag].count) {
1304                 for(i = 0; parms[Xflag].arg[i]; i++)
1305                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1306                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1307                         goto err_malloc;
1308                 rule = strtok(parms[Xflag].arg, ":");
1309                 for (i = 0; rule; i++) {
1310                         snmp_rules[i].len = strlen(rule);
1311                         if (snmp_rules[i].len > IFNAMSIZ) {
1312                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1313                                 exit(1);
1314                         }
1315                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1316                         if (!*(rule - 1)) *(rule - 1) = ',';
1317                         rule = strtok(NULL, ",");
1318                         if (!rule) {
1319                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1320                                 exit(1);
1321                         }
1322                         snmp_rules[i].base = atoi(rule);
1323                         *(rule - 1) = ':';
1324                         rule = strtok(NULL, ":");
1325                 }
1326                 nsnmp_rules = i;
1327         }
1328         if (parms[tflag].count)
1329                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1330         if (parms[aflag].count) {
1331                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1332                 bad_lhost:
1333                         fprintf(stderr, "Illegal %s\n", "source address");
1334                         exit(1);
1335                 } else {
1336                         saddr = *((struct sockaddr_in *) res->ai_addr);
1337                         freeaddrinfo(res);
1338                 }
1339         }
1340         if (parms[uflag].count) 
1341                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1342                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1343                         exit(1);
1344                 }
1345
1346
1347         /* Process collectors parameters. Brrrr... :-[ */
1348
1349         npeers = argc - optind;
1350         if (npeers > 1) {
1351                 /* Send to remote Netflow collector */
1352                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1353                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1354                         dhost = argv[i];
1355                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1356                         *dport++ = 0;
1357                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1358                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1359                                 exit(1);
1360                         }
1361                         peers[npeers].write_fd = write_fd;
1362                         peers[npeers].type = PEER_MIRROR;
1363                         peers[npeers].laddr = saddr;
1364                         peers[npeers].seq = 0;
1365                         if ((lhost = strchr(dport, '/'))) {
1366                                 *lhost++ = 0;
1367                                 if ((type = strchr(lhost, '/'))) {
1368                                         *type++ = 0;
1369                                         switch (*type) {
1370                                                 case 0:
1371                                                 case 'm':
1372                                                         break;
1373
1374                                                 case 'r':
1375                                                         peers[npeers].type = PEER_ROTATE;
1376                                                         npeers_rot++;
1377                                                         break;
1378
1379                                                 default:
1380                                                         goto bad_collector;
1381                                         }
1382                                 }
1383                                 if (*lhost) {
1384                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1385                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1386                                         freeaddrinfo(res);
1387                                 }
1388                         }
1389                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1390                                                 sizeof(struct sockaddr_in))) {
1391                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1392                                 exit(1);
1393                         }
1394                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1395 bad_collector:
1396                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1397                                 exit(1);
1398                         }
1399                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1400                         freeaddrinfo(res);
1401                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1402                                                 sizeof(struct sockaddr_in))) {
1403                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1404                                 exit(1);
1405                         }
1406
1407                         /* Restore command line */
1408                         if (type) *--type = '/';
1409                         if (lhost) *--lhost = '/';
1410                         *--dport = ':';
1411                 }
1412         }
1413         else if (parms[fflag].count) {
1414                 // log into a file
1415                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1416                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1417                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1418                 
1419                 peers[npeers].write_fd = -1;
1420                 peers[npeers].type = PEER_FILE;
1421                 peers[npeers].seq = 0;
1422                 npeers++;
1423         }
1424         else 
1425                 usage();
1426
1427
1428         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1429         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1430         if (!ulog_handle) {
1431                 fprintf(stderr, "libipulog initialization error: %s",
1432                         ipulog_strerror(ipulog_errno));
1433                 exit(1);
1434         }
1435         if (sockbufsize)
1436                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1437                         &sockbufsize, sizeof(sockbufsize)) < 0)
1438                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1439
1440         /* Daemonize (if log destination stdout-free) */
1441
1442         my_log_open(ident, verbosity, log_dest);
1443         if (!(log_dest & 2)) {
1444                 switch (fork()) {
1445                         case -1:
1446                                 fprintf(stderr, "fork(): %s", strerror(errno));
1447                                 exit(1);
1448
1449                         case 0:
1450                                 setsid();
1451                                 freopen("/dev/null", "r", stdin);
1452                                 freopen("/dev/null", "w", stdout);
1453                                 freopen("/dev/null", "w", stderr);
1454                                 break;
1455
1456                         default:
1457                                 exit(0);
1458                 }
1459         } else {
1460                 setvbuf(stdout, (char *)0, _IONBF, 0);
1461                 setvbuf(stderr, (char *)0, _IONBF, 0);
1462         }
1463
1464         pid = getpid();
1465         sprintf(errpbuf, "[%ld]", (long) pid);
1466         strcat(ident, errpbuf);
1467
1468         /* Initialization */
1469
1470         hash_init(); /* Actually for crc16 only */
1471         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1472         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1473
1474 #ifdef UPTIME_TRICK
1475         /* Hope 12 days is enough :-/ */
1476         start_time_offset = 1 << 20;
1477
1478         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1479 #endif
1480         gettime(&start_time);
1481
1482         /*
1483         Build static pending queue as circular buffer.
1484         */
1485         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1486         pending_tail = pending_head;
1487         for (i = pending_queue_length - 1; i--;) {
1488                 if (!(pending_tail->next = mem_alloc())) {
1489                 err_mem_alloc:
1490                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1491                         exit(1);
1492                 }
1493                 pending_tail = pending_tail->next;
1494         }
1495         pending_tail->next = pending_head;
1496         pending_tail = pending_head;
1497
1498         sigemptyset(&sig_mask);
1499         sigact.sa_handler = &sighandler;
1500         sigact.sa_mask = sig_mask;
1501         sigact.sa_flags = 0;
1502         sigaddset(&sig_mask, SIGTERM);
1503         sigaction(SIGTERM, &sigact, 0);
1504 #if ((DEBUG) & DEBUG_I)
1505         sigaddset(&sig_mask, SIGUSR1);
1506         sigaction(SIGUSR1, &sigact, 0);
1507 #endif
1508         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1509                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1510                 exit(1);
1511         }
1512
1513         my_log(LOG_INFO, "Starting %s...", VERSION);
1514
1515         if (parms[cflag].count) {
1516                 if (chdir(parms[cflag].arg) || chroot(".")) {
1517                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1518                         exit(1);
1519                 }
1520         }
1521
1522         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1523         pthread_attr_init(&tattr);
1524         for (i = 0; i < THREADS - 1; i++) {
1525                 if (schedp.sched_priority > 0) {
1526                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1527                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1528                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1529                                 exit(1);
1530                         }
1531                 }
1532                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1533                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1534                         exit(1);
1535                 }
1536                 pthread_detach(thid);
1537                 schedp.sched_priority++;
1538         }
1539
1540         if (pw) {
1541                 if (setgroups(0, NULL)) {
1542                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1543                         exit(1);
1544                 }
1545                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1546                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1547                         exit(1);
1548                 }
1549                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1550                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1551                         exit(1);
1552                 }
1553         }
1554
1555         if (!(pidfile = fopen(pidfilepath, "w")))
1556                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1557         else {
1558                 fprintf(pidfile, "%ld\n", (long) pid);
1559                 fclose(pidfile);
1560         }
1561
1562         my_log(LOG_INFO, "pid: %d", pid);
1563         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1564                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1565                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1566                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1567                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1568                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1569                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1570         for (i = 0; i < nsnmp_rules; i++) {
1571                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1572                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1573         }
1574         for (i = 0; i < npeers; i++) {
1575                 switch (peers[i].type) {
1576                         case PEER_MIRROR:
1577                                 c = 'm';
1578                                 break;
1579                         case PEER_ROTATE:
1580                                 c = 'r';
1581                                 break;
1582                 }
1583                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1584                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1585                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1586         }
1587
1588         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1589
1590         timeout.tv_usec = 0;
1591         while (!killed
1592                 || (total_elements - free_elements - pending_queue_length)
1593                 || emit_count
1594                 || pending_tail->flags) {
1595
1596                 if (!sigs) {
1597                         timeout.tv_sec = scan_interval;
1598                         select(0, 0, 0, 0, &timeout);
1599                 }
1600
1601                 if (sigs & SIGTERM_MASK && !killed) {
1602                         sigs &= ~SIGTERM_MASK;
1603                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1604                         scan_interval = 1;
1605                         frag_lifetime = -1;
1606                         active_lifetime = -1;
1607                         inactive_lifetime = -1;
1608                         emit_timeout = 1;
1609                         unpending_timeout = 1;
1610                         killed = 1;
1611                         pthread_cond_signal(&scan_cond);
1612                         pthread_cond_signal(&unpending_cond);
1613                 }
1614
1615 #if ((DEBUG) & DEBUG_I)
1616                 if (sigs & SIGUSR1_MASK) {
1617                         sigs &= ~SIGUSR1_MASK;
1618                         info_debug();
1619                 }
1620 #endif
1621         }
1622         remove(pidfilepath);
1623 #if ((DEBUG) & DEBUG_I)
1624         info_debug();
1625 #endif
1626         my_log(LOG_INFO, "Done.");
1627 #ifdef WALL
1628         return 0;
1629 #endif
1630 }