Minor bugfix
[fprobe-ulog.git] / src / fprobe-ulog.c
1 /*
2         Copyright (C) Slava Astashonok <sla@0n.ru>
3
4         This program is free software; you can redistribute it and/or
5         modify it under the terms of the GNU General Public License.
6
7         $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
8
9         7/11/2007       Sapan Bhatia <sapanb@cs.princeton.edu> 
10                         
11                 Added data collection (-f) functionality, xid support in the header and log file
12                 rotation.
13 */
14
15 #include <common.h>
16
17 /* stdout, stderr, freopen() */
18 #include <stdio.h>
19
20 /* atoi(), exit() */
21 #include <stdlib.h>
22
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
24 #include <unistd.h>
25
26 /* strerror() */
27 #include <string.h>
28
29 /* sig*() */
30 #include <signal.h>
31
32 /* statfs() */
33
34 #include <sys/statfs.h>
35
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
38         int fd;
39         u_int8_t blocking;
40         struct sockaddr_nl local;
41         struct sockaddr_nl peer;
42         struct nlmsghdr* last_nlhdr;
43 };
44
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
55 #include <net/if.h>
56
57 #include <sys/param.h>
58 #include <pwd.h>
59 #ifdef OS_LINUX
60 #include <grp.h>
61 #endif
62
63 /* pthread_*() */
64 #include <pthread.h>
65
66 /* errno */
67 #include <errno.h>
68
69 /* getaddrinfo() */
70 #include <netdb.h>
71
72 /* nanosleep() */
73 #include <time.h>
74
75 /* gettimeofday() */
76 #include <sys/time.h>
77
78 /* scheduling */
79 #include <sched.h>
80
81 /* select() (POSIX)*/
82 #include <sys/select.h>
83
84 /* open() */
85 #include <sys/stat.h>
86 #include <fcntl.h>
87
88 #include <fprobe-ulog.h>
89 #include <my_log.h>
90 #include <my_getopt.h>
91 #include <netflow.h>
92 #include <hash.h>
93 #include <mem.h>
94
95 enum {
96         aflag,
97         Bflag,
98         bflag,
99         cflag,
100         dflag,
101         Dflag,
102         eflag,
103         Eflag,
104         fflag,
105         gflag,
106         hflag,
107         lflag,
108         mflag,
109         Mflag,
110         nflag,
111         qflag,
112         rflag,
113         sflag,
114         tflag,
115         Tflag,
116         Uflag,
117         uflag,
118         vflag,
119         Wflag,
120         Xflag,
121 };
122
123 static struct getopt_parms parms[] = {
124         {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125         {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126         {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127         {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128         {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129         {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
130         {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
131         {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
132         {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
133         {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
134         {'h', 0, 0, 0},
135         {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
136         {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
137         {'M', 0, 0, 0},
138         {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
139         {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
140         {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
141         {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
142         {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
143         {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
144         {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
145         {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
146         {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
147         {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
148         {0, 0, 0, 0}
149 };
150
151 extern char *optarg;
152 extern int optind, opterr, optopt;
153 extern int errno;
154
155 extern struct NetFlow NetFlow1;
156 extern struct NetFlow NetFlow5;
157 extern struct NetFlow NetFlow7;
158
159 #define START_VALUE -5
160 #define mark_is_tos parms[Mflag].count
161 static unsigned scan_interval = 5;
162 static int min_free = 0;
163 static int frag_lifetime = 30;
164 static int inactive_lifetime = 60;
165 static int active_lifetime = 300;
166 static int sockbufsize;
167 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
168 #if (MEM_BITS == 0) || (MEM_BITS == 16)
169 #define BULK_QUANTITY 10000
170 #else
171 #define BULK_QUANTITY 200
172 #endif
173
174 static unsigned epoch_length=60, log_epochs=1; 
175 static unsigned cur_epoch=0,prev_uptime=0;
176
177 static unsigned bulk_quantity = BULK_QUANTITY;
178 static unsigned pending_queue_length = 100;
179 static struct NetFlow *netflow = &NetFlow5;
180 static unsigned verbosity = 6;
181 static unsigned log_dest = MY_LOG_SYSLOG;
182 static struct Time start_time;
183 static long start_time_offset;
184 static int off_tl;
185 /* From mem.c */
186 extern unsigned total_elements;
187 extern unsigned free_elements;
188 extern unsigned total_memory;
189 #if ((DEBUG) & DEBUG_I)
190 static unsigned emit_pkts, emit_queue;
191 static uint64_t size_total;
192 static unsigned pkts_total, pkts_total_fragmented;
193 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
194 static unsigned pkts_pending, pkts_pending_done;
195 static unsigned pending_queue_trace, pending_queue_trace_candidate;
196 static unsigned flows_total, flows_fragmented;
197 #endif
198 static unsigned emit_count;
199 static uint32_t emit_sequence;
200 static unsigned emit_rate_bytes, emit_rate_delay;
201 static struct Time emit_time;
202 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
203 static pthread_t thid;
204 static sigset_t sig_mask;
205 static struct sched_param schedp;
206 static int sched_min, sched_max;
207 static int npeers, npeers_rot;
208 static struct peer *peers;
209 static int sigs;
210
211 static struct Flow *flows[1 << HASH_BITS];
212 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
213
214 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
215 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
216
217 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
218 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
219 static struct Flow *pending_head, *pending_tail;
220 static struct Flow *scan_frag_dreg;
221
222 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
224 static struct Flow *flows_emit;
225
226 static char ident[256] = "fprobe-ulog";
227 static FILE *pidfile;
228 static char *pidfilepath;
229 static pid_t pid;
230 static int killed;
231 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
232 static struct ipulog_handle *ulog_handle;
233 static uint32_t ulog_gmask = 1;
234 static char *cap_buf;
235 static int nsnmp_rules;
236 static struct snmp_rule *snmp_rules;
237 static struct passwd *pw = 0;
238
239 void usage()
240 {
241         fprintf(stdout,
242                 "fprobe-ulog: a NetFlow probe. Version %s\n"
243                 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
244                 "\n"
245                 "-h\t\tDisplay this help\n"
246                 "-U <mask>\tULOG group bitwise mask [1]\n"
247                 "-s <seconds>\tHow often scan for expired flows [5]\n"
248                 "-g <seconds>\tFragmented flow lifetime [30]\n"
249                 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
250                 "-f <filename>\tLog flow data in a file\n"
251                 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
252                 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
253                 "-a <address>\tUse <address> as source for NetFlow flow\n"
254                 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
255                 "-M\t\tUse netfilter mark value as ToS flag\n"
256                 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
257                 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
258                 "-q <flows>\tPending queue length [100]\n"
259                 "-B <kilobytes>\tKernel capture buffer size [0]\n"
260                 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
261                 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
262                 "-c <directory>\tDirectory to chroot to\n"
263                 "-u <user>\tUser to run as\n"
264                 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
265                 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
266                 "-y <remote:port>\tAddress of the NetFlow collector\n"
267                 "-f <writable file>\tFile to write data into\n"
268                 "-T <n>\tRotate log file every n epochs\n"
269                 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
270                 "-E <[1..60]>\tSize of an epoch in minutes\n"
271                 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
272                 ,
273                 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
274         exit(0);
275 }
276
277 #if ((DEBUG) & DEBUG_I)
278 void info_debug()
279 {
280         my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
281                 pkts_total, pkts_total_fragmented, size_total,
282                 pkts_pending - pkts_pending_done, pending_queue_trace);
283         my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
284                 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
285         my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
286                 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
287         my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
288                 total_elements, free_elements, total_memory);
289 }
290 #endif
291
292 void sighandler(int sig)
293 {
294         switch (sig) {
295                 case SIGTERM:
296                         sigs |= SIGTERM_MASK;
297                         break;
298 #if ((DEBUG) & DEBUG_I)
299                 case SIGUSR1:
300                         sigs |= SIGUSR1_MASK;
301                         break;
302 #endif
303         }
304 }
305
306 void gettime(struct Time *now)
307 {
308         struct timeval t;
309
310         gettimeofday(&t, 0);
311         now->sec = t.tv_sec;
312         now->usec = t.tv_usec;
313 }
314
315
316 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
317 {
318         return (t1->sec - t2->sec)/60;
319 }
320
321 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
322 {
323         return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
324 }
325
326 /* Uptime in miliseconds */
327 uint32_t getuptime(struct Time *t)
328 {
329         /* Maximum uptime is about 49/2 days */
330         return cmpmtime(t, &start_time);
331 }
332
333 /* Uptime in minutes */
334 uint32_t getuptime_minutes(struct Time *t)
335 {
336         /* Maximum uptime is about 49/2 days */
337         return cmpMtime(t, &start_time);
338 }
339
340 hash_t hash_flow(struct Flow *flow)
341 {
342         if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
343         else return hash(flow, sizeof(struct Flow_TL));
344 }
345
346 uint16_t snmp_index(char *name) {
347         uint32_t i;
348
349         if (!*name) return 0;
350
351         for (i = 0; (int) i < nsnmp_rules; i++) {
352                 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
353                 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
354         }
355
356         if ((i = if_nametoindex(name))) return i;
357
358         return -1;
359 }
360
361 inline void copy_flow(struct Flow *src, struct Flow *dst)
362 {
363         dst->iif = src->iif;
364         dst->oif = src->oif;
365         dst->sip = src->sip;
366         dst->dip = src->dip;
367         dst->tos = src->tos;
368         dst->proto = src->proto;
369         dst->tcp_flags = src->tcp_flags;
370         dst->id = src->id;
371         dst->sp = src->sp;
372         dst->dp = src->dp;
373         dst->pkts = src->pkts;
374         dst->size = src->size;
375         dst->sizeF = src->sizeF;
376         dst->sizeP = src->sizeP;
377         dst->ctime = src->ctime;
378         dst->mtime = src->mtime;
379         dst->flags = src->flags;
380 }
381
382 void get_cur_epoch() {
383         int fd;
384         fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
385         if (fd != -1) {
386                 char snum[7];
387                 ssize_t len;
388                 len = read(fd, snum, sizeof(snum)-1);
389                 if (len != -1) {
390                         snum[len]='\0';
391                         sscanf(snum,"%d",&cur_epoch);
392                         close(fd);
393                 }
394         }
395         return;
396 }
397
398
399 void update_cur_epoch_file(int n) {
400         int fd, len;
401         char snum[7];
402         len=snprintf(snum,6,"%d",n);
403         fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
404         if (fd == -1) {
405                 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
406                 return;
407         }
408         write(fd, snum, len);
409         close(fd);
410 }
411
412 unsigned get_log_fd(char *fname, unsigned cur_fd) {
413         struct Time now;
414         unsigned cur_uptime;
415         /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
416          * doesn't solve the problem */
417
418         struct statfs statfs;
419         int ret_fd;
420         gettime(&now);
421         cur_uptime = getuptime_minutes(&now);
422
423         if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
424                 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
425         }
426         else {
427                 if (min_free && statfs.f_bfree < min_free) 
428                         switch(cur_epoch) {
429                                 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
430                                         my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
431                                         exit(1);
432                                 default:
433                                         my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
434                                         cur_epoch = -1;
435                         }
436         }
437
438         /* Epoch length in minutes */
439         if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
440                 char nextname[MAX_PATH_LEN];
441                 int write_fd;
442                 prev_uptime = cur_uptime;
443                 cur_epoch = (cur_epoch + 1) % log_epochs;
444                 close(cur_fd);
445                 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
446                 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
447                         my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
448                         exit(1);
449                 }
450                 update_cur_epoch_file(cur_epoch);
451                 ret_fd = write_fd;
452         }
453         else
454                 ret_fd = cur_fd;
455         return(ret_fd);
456 }
457
458 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
459 {
460         struct Flow **flowpp;
461
462 #ifdef WALL
463         flowpp = 0;
464 #endif
465
466         if (prev) flowpp = *prev;
467
468         while (where) {
469                 if (where->sip.s_addr == what->sip.s_addr
470                         && where->dip.s_addr == what->dip.s_addr
471                         && where->proto == what->proto) {
472                         switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
473                                 case 0:
474                                         /* Both unfragmented */
475                                         if ((what->sp == where->sp)
476                                                 && (what->dp == where->dp)) goto done;
477                                         break;
478                                 case 2:
479                                         /* Both fragmented */
480                                         if (where->id == what->id) goto done;
481                                         break;
482                         }
483                 }
484                 flowpp = &where->next;
485                 where = where->next;
486         }
487 done:
488         if (prev) *prev = flowpp;
489         return where;
490 }
491
492 int put_into(struct Flow *flow, int flag
493 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
494         , char *logbuf
495 #endif
496 )
497 {
498         int ret = 0;
499         hash_t h;
500         struct Flow *flown, **flowpp;
501 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
502         char buf[64];
503 #endif
504
505         h = hash_flow(flow);
506 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
507         sprintf(buf, " %x H:%04x", (unsigned) flow, h);
508         strcat(logbuf, buf);
509 #endif
510         pthread_mutex_lock(&flows_mutex[h]);
511         flowpp = &flows[h];
512         if (!(flown = find(flows[h], flow, &flowpp))) {
513                 /* No suitable flow found - add */
514                 if (flag == COPY_INTO) {
515                         if ((flown = mem_alloc())) {
516                                 copy_flow(flow, flown);
517                                 flow = flown;
518                         } else {
519 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
520                                 my_log(LOG_ERR, "%s %s. %s",
521                                         "mem_alloc():", strerror(errno), "packet lost");
522 #endif
523                                 return -1;
524                         }
525                 }
526                 flow->next = flows[h];
527                 flows[h] = flow;
528 #if ((DEBUG) & DEBUG_I)
529                 flows_total++;
530                 if (flow->flags & FLOW_FRAG) flows_fragmented++;
531 #endif
532 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
533                 if (flown) {
534                         sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
535                         strcat(logbuf, buf);
536                 }
537 #endif
538         } else {
539                 /* Found suitable flow - update */
540 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
541                 sprintf(buf, " +> %x", (unsigned) flown);
542                 strcat(logbuf, buf);
543 #endif
544                 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
545                         flown->mtime = flow->mtime;
546                 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
547                         flown->ctime = flow->ctime;
548                 flown->tcp_flags |= flow->tcp_flags;
549                 flown->size += flow->size;
550                 flown->pkts += flow->pkts;
551                 if (flow->flags & FLOW_FRAG) {
552                         /* Fragmented flow require some additional work */
553                         if (flow->flags & FLOW_TL) {
554                                 /*
555                                 ?FIXME?
556                                 Several packets with FLOW_TL (attack)
557                                 */
558                                 flown->sp = flow->sp;
559                                 flown->dp = flow->dp;
560                         }
561                         if (flow->flags & FLOW_LASTFRAG) {
562                                 /*
563                                 ?FIXME?
564                                 Several packets with FLOW_LASTFRAG (attack)
565                                 */
566                                 flown->sizeP = flow->sizeP;
567                         }
568                         flown->flags |= flow->flags;
569                         flown->sizeF += flow->sizeF;
570                         if ((flown->flags & FLOW_LASTFRAG)
571                                 && (flown->sizeF >= flown->sizeP)) {
572                                 /* All fragments received - flow reassembled */
573                                 *flowpp = flown->next;
574                                 pthread_mutex_unlock(&flows_mutex[h]);
575 #if ((DEBUG) & DEBUG_I)
576                                 flows_total--;
577                                 flows_fragmented--;
578 #endif
579                                 flown->id = 0;
580                                 flown->flags &= ~FLOW_FRAG;
581 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
582                                 strcat(logbuf," R");
583 #endif
584                                 ret = put_into(flown, MOVE_INTO
585 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
586                                                 , logbuf
587 #endif
588                                         );
589                         }
590                 }
591                 if (flag == MOVE_INTO) mem_free(flow);
592         }
593         pthread_mutex_unlock(&flows_mutex[h]);
594         return ret;
595 }
596
597 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
598 {
599         int i;
600
601         for (i = 0; i < fields; i++) {
602 #if ((DEBUG) & DEBUG_F)
603                 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
604 #endif
605                 switch (format[i]) {
606                         case NETFLOW_IPV4_SRC_ADDR:
607                                 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
608                                 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
609                                 break;
610
611                         case NETFLOW_IPV4_DST_ADDR:
612                                 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
613                                 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
614                                         my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
615                                 }
616                                 p += NETFLOW_IPV4_DST_ADDR_SIZE;
617                                 break;
618
619                         case NETFLOW_INPUT_SNMP:
620                                 *((uint16_t *) p) = htons(flow->iif);
621                                 p += NETFLOW_INPUT_SNMP_SIZE;
622                                 break;
623
624                         case NETFLOW_OUTPUT_SNMP:
625                                 *((uint16_t *) p) = htons(flow->oif);
626                                 p += NETFLOW_OUTPUT_SNMP_SIZE;
627                                 break;
628
629                         case NETFLOW_PKTS_32:
630                                 *((uint32_t *) p) = htonl(flow->pkts);
631                                 p += NETFLOW_PKTS_32_SIZE;
632                                 break;
633
634                         case NETFLOW_BYTES_32:
635                                 *((uint32_t *) p) = htonl(flow->size);
636                                 p += NETFLOW_BYTES_32_SIZE;
637                                 break;
638
639                         case NETFLOW_FIRST_SWITCHED:
640                                 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
641                                 p += NETFLOW_FIRST_SWITCHED_SIZE;
642                                 break;
643
644                         case NETFLOW_LAST_SWITCHED:
645                                 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
646                                 p += NETFLOW_LAST_SWITCHED_SIZE;
647                                 break;
648
649                         case NETFLOW_L4_SRC_PORT:
650                                 *((uint16_t *) p) = flow->sp;
651                                 p += NETFLOW_L4_SRC_PORT_SIZE;
652                                 break;
653
654                         case NETFLOW_L4_DST_PORT:
655                                 *((uint16_t *) p) = flow->dp;
656                                 p += NETFLOW_L4_DST_PORT_SIZE;
657                                 break;
658
659                         case NETFLOW_PROT:
660                                 *((uint8_t *) p) = flow->proto;
661                                 p += NETFLOW_PROT_SIZE;
662                                 break;
663
664                         case NETFLOW_SRC_TOS:
665                                 *((uint8_t *) p) = flow->tos;
666                                 p += NETFLOW_SRC_TOS_SIZE;
667                                 break;
668
669                         case NETFLOW_TCP_FLAGS:
670                                 *((uint8_t *) p) = flow->tcp_flags;
671                                 p += NETFLOW_TCP_FLAGS_SIZE;
672                                 break;
673
674                         case NETFLOW_VERSION:
675                                 *((uint16_t *) p) = htons(netflow->Version);
676                                 p += NETFLOW_VERSION_SIZE;
677                                 break;
678
679                         case NETFLOW_COUNT:
680                                 *((uint16_t *) p) = htons(emit_count);
681                                 p += NETFLOW_COUNT_SIZE;
682                                 break;
683
684                         case NETFLOW_UPTIME:
685                                 *((uint32_t *) p) = htonl(getuptime(&emit_time));
686                                 p += NETFLOW_UPTIME_SIZE;
687                                 break;
688
689                         case NETFLOW_UNIX_SECS:
690                                 *((uint32_t *) p) = htonl(emit_time.sec);
691                                 p += NETFLOW_UNIX_SECS_SIZE;
692                                 break;
693
694                         case NETFLOW_UNIX_NSECS:
695                                 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
696                                 p += NETFLOW_UNIX_NSECS_SIZE;
697                                 break;
698
699                         case NETFLOW_FLOW_SEQUENCE:
700                                 //*((uint32_t *) p) = htonl(emit_sequence);
701                                 *((uint32_t *) p) = 0;
702                                 p += NETFLOW_FLOW_SEQUENCE_SIZE;
703                                 break;
704
705                         case NETFLOW_PAD8:
706                         /* Unsupported (uint8_t) */
707                         case NETFLOW_ENGINE_TYPE:
708                         case NETFLOW_ENGINE_ID:
709                         case NETFLOW_FLAGS7_1:
710                         case NETFLOW_SRC_MASK:
711                         case NETFLOW_DST_MASK:
712                                 *((uint8_t *) p) = 0;
713                                 p += NETFLOW_PAD8_SIZE;
714                                 break;
715                         case NETFLOW_XID:
716                                 *((uint16_t *) p) = flow->tos;
717                                 p += NETFLOW_XID_SIZE;
718                                 break;
719                         case NETFLOW_PAD16:
720                         /* Unsupported (uint16_t) */
721                         case NETFLOW_SRC_AS:
722                         case NETFLOW_DST_AS:
723                         case NETFLOW_FLAGS7_2:
724                                 *((uint16_t *) p) = 0;
725                                 p += NETFLOW_PAD16_SIZE;
726                                 break;
727
728                         case NETFLOW_PAD32:
729                         /* Unsupported (uint32_t) */
730                         case NETFLOW_IPV4_NEXT_HOP:
731                         case NETFLOW_ROUTER_SC:
732                                 *((uint32_t *) p) = 0;
733                                 p += NETFLOW_PAD32_SIZE;
734                                 break;
735
736                         default:
737                                 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
738                                         format, i, format[i]);
739                                 exit(1);
740                 }
741         }
742 #if ((DEBUG) & DEBUG_F)
743         my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
744 #endif
745         return p;
746 }
747
748 void setuser() {
749         /*
750         Workaround for clone()-based threads
751         Try to change EUID independently of main thread
752         */
753         if (pw) {
754                 setgroups(0, NULL);
755                 setregid(pw->pw_gid, pw->pw_gid);
756                 setreuid(pw->pw_uid, pw->pw_uid);
757         }
758 }
759
760 void *emit_thread()
761 {
762         struct Flow *flow;
763         void *p;
764         struct timeval now;
765         struct timespec timeout;
766         int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
767
768         p = (void *) &emit_packet + netflow->HeaderSize;
769         timeout.tv_nsec = 0;
770
771         setuser();
772
773         for (;;) {
774                 pthread_mutex_lock(&emit_mutex);
775                 while (!flows_emit) {
776                         gettimeofday(&now, 0);
777                         timeout.tv_sec = now.tv_sec + emit_timeout;
778                         /* Do not wait until emit_packet will filled - it may be too long */
779                         if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
780                                 pthread_mutex_unlock(&emit_mutex);
781                                 goto sendit;
782                         }
783                 }
784                 flow = flows_emit;
785                 flows_emit = flows_emit->next;
786 #if ((DEBUG) & DEBUG_I)
787                 emit_queue--;
788 #endif          
789                 pthread_mutex_unlock(&emit_mutex);
790
791 #ifdef UPTIME_TRICK
792                 if (!emit_count) {
793                         gettime(&start_time);
794                         start_time.sec -= start_time_offset;
795                 }
796 #endif
797                 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
798                 mem_free(flow);
799                 emit_count++;
800 #ifdef PF2_DEBUG
801                 printf("Emit count = %d\n", emit_count);
802                 fflush(stdout);
803 #endif
804                 if (emit_count == netflow->MaxFlows) {
805                 sendit:
806                         gettime(&emit_time);
807                         p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
808                         size = netflow->HeaderSize + emit_count * netflow->FlowSize;
809                         /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
810                         if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
811                         peer_rot_cur = 0;
812                         for (i = 0; i < npeers; i++) {
813                                 if (peers[i].type == PEER_FILE) {
814                                                 if (netflow->SeqOffset)
815                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
816                                                 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
817                                                 ret = write(peers[i].write_fd, emit_packet, size);
818                                                 if (ret < size) {
819
820 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
821                                                         my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
822                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
823 #endif
824 #undef MESSAGES
825                                                 }
826 #if ((DEBUG) & DEBUG_E)
827                                                 commaneelse {
828                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
829                                                                 emit_count, i + 1, peers[i].seq);
830                                                 }
831 #endif
832                                                 peers[i].seq += emit_count;
833
834                                                 /* Rate limit */
835                                                 if (emit_rate_bytes) {
836                                                         sent += size;
837                                                         delay = sent / emit_rate_bytes;
838                                                         if (delay) {
839                                                                 sent %= emit_rate_bytes;
840                                                                 timeout.tv_sec = 0;
841                                                                 timeout.tv_nsec = emit_rate_delay * delay;
842                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
843                                                         }
844                                                 }
845                                         }
846                                 else
847                                 if (peers[i].type == PEER_MIRROR) goto sendreal;
848                                 else
849                                 if (peers[i].type == PEER_ROTATE) 
850                                         if (peer_rot_cur++ == peer_rot_work) {
851                                         sendreal:
852                                                 if (netflow->SeqOffset)
853                                                         *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
854                                                 ret = send(peers[i].write_fd, emit_packet, size, 0);
855                                                 if (ret < size) {
856 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
857                                                         my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
858                                                                 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
859 #endif
860                                                 }
861 #if ((DEBUG) & DEBUG_E)
862                                                 commaneelse {
863                                                         my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
864                                                                 emit_count, i + 1, peers[i].seq);
865                                                 }
866 #endif
867                                                 peers[i].seq += emit_count;
868
869                                                 /* Rate limit */
870                                                 if (emit_rate_bytes) {
871                                                         sent += size;
872                                                         delay = sent / emit_rate_bytes;
873                                                         if (delay) {
874                                                                 sent %= emit_rate_bytes;
875                                                                 timeout.tv_sec = 0;
876                                                                 timeout.tv_nsec = emit_rate_delay * delay;
877                                                                 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
878                                                         }
879                                                 }
880                                         }
881                         }
882                         if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
883                         emit_sequence += emit_count;
884                         emit_count = 0;
885 #if ((DEBUG) & DEBUG_I)
886                         emit_pkts++;
887 #endif
888                 }
889         }
890 }       
891
892 void *unpending_thread()
893 {
894         struct timeval now;
895         struct timespec timeout;
896 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
897         char logbuf[256];
898 #endif
899
900         setuser();
901
902         timeout.tv_nsec = 0;
903         pthread_mutex_lock(&unpending_mutex);
904
905         for (;;) {
906                 while (!(pending_tail->flags & FLOW_PENDING)) {
907                         gettimeofday(&now, 0);
908                         timeout.tv_sec = now.tv_sec + unpending_timeout;
909                         pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
910                 }
911
912 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
913                 *logbuf = 0;
914 #endif
915                 if (put_into(pending_tail, COPY_INTO
916 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
917                                 , logbuf
918 #endif
919                         ) < 0) {
920 #if ((DEBUG) & DEBUG_I)
921                         pkts_lost_unpending++;
922 #endif                          
923                 }
924
925 #if ((DEBUG) & DEBUG_U)
926                 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
927 #endif
928
929                 pending_tail->flags = 0;
930                 pending_tail = pending_tail->next;
931 #if ((DEBUG) & DEBUG_I)
932                 pkts_pending_done++;
933 #endif
934         }
935 }
936
937 void *scan_thread()
938 {
939 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
940         char logbuf[256];
941 #endif
942         int i;
943         struct Flow *flow, **flowpp;
944         struct Time now;
945         struct timespec timeout;
946
947         setuser();
948
949         timeout.tv_nsec = 0;
950         pthread_mutex_lock(&scan_mutex);
951
952         for (;;) {
953                 gettime(&now);
954                 timeout.tv_sec = now.sec + scan_interval;
955                 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
956
957                 gettime(&now);
958 #if ((DEBUG) & DEBUG_S)
959                 my_log(LOG_DEBUG, "S: %d", now.sec);
960 #endif
961                 for (i = 0; i < 1 << HASH_BITS ; i++) {
962                         pthread_mutex_lock(&flows_mutex[i]);
963                         flow = flows[i];
964                         flowpp = &flows[i];
965                         while (flow) {
966                                 if (flow->flags & FLOW_FRAG) {
967                                         /* Process fragmented flow */
968                                         if ((now.sec - flow->mtime.sec) > frag_lifetime) {
969                                                 /* Fragmented flow expired - put it into special chain */
970 #if ((DEBUG) & DEBUG_I)
971                                                 flows_fragmented--;
972                                                 flows_total--;
973 #endif
974                                                 *flowpp = flow->next;
975                                                 flow->id = 0;
976                                                 flow->flags &= ~FLOW_FRAG;
977                                                 flow->next = scan_frag_dreg;
978                                                 scan_frag_dreg = flow;
979                                                 flow = *flowpp;
980                                                 continue;
981                                         }
982                                 } else {
983                                         /* Flow is not frgamented */
984                                         if ((now.sec - flow->mtime.sec) > inactive_lifetime
985                                                 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
986                                                 /* Flow expired */
987 #if ((DEBUG) & DEBUG_S)
988                                                 my_log(LOG_DEBUG, "S: E %x", flow);
989 #endif
990 #if ((DEBUG) & DEBUG_I)
991                                                 flows_total--;
992 #endif
993                                                 *flowpp = flow->next;
994                                                 pthread_mutex_lock(&emit_mutex);
995                                                 flow->next = flows_emit;
996                                                 flows_emit = flow;
997 #if ((DEBUG) & DEBUG_I)
998                                                 emit_queue++;
999 #endif                          
1000                                                 pthread_mutex_unlock(&emit_mutex);
1001                                                 flow = *flowpp;
1002                                                 continue;
1003                                         }
1004                                 }
1005                                 flowpp = &flow->next;
1006                                 flow = flow->next;
1007                         } /* chain loop */
1008                         pthread_mutex_unlock(&flows_mutex[i]);
1009                 } /* hash loop */
1010                 if (flows_emit) pthread_cond_signal(&emit_cond);
1011
1012                 while (scan_frag_dreg) {
1013                         flow = scan_frag_dreg;
1014                         scan_frag_dreg = flow->next;
1015 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1016                         *logbuf = 0;
1017 #endif
1018                         put_into(flow, MOVE_INTO
1019 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1020                                 , logbuf
1021 #endif
1022                         );
1023 #if ((DEBUG) & DEBUG_S)
1024                         my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1025 #endif
1026                 }
1027         }
1028 }
1029
1030 void *cap_thread()
1031 {
1032         struct ulog_packet_msg *ulog_msg;
1033         struct ip *nl;
1034         void *tl;
1035         struct Flow *flow;
1036         int len, off_frag, psize;
1037 #if ((DEBUG) & DEBUG_C)
1038         char buf[64];
1039         char logbuf[256];
1040 #endif
1041
1042         setuser();
1043
1044         while (!killed) {
1045                 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1046                 if (len <= 0) {
1047                         my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1048                         continue;
1049                 }
1050                 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1051
1052 #if ((DEBUG) & DEBUG_C)
1053                         sprintf(logbuf, "C: %d", ulog_msg->data_len);
1054 #endif
1055
1056                         nl = (void *) &ulog_msg->payload;
1057                         psize = ulog_msg->data_len;
1058
1059                         /* Sanity check */
1060                         if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1061 #if ((DEBUG) & DEBUG_C)
1062                                 strcat(logbuf, " U");
1063                                 my_log(LOG_DEBUG, "%s", logbuf);
1064 #endif
1065 #if ((DEBUG) & DEBUG_I)
1066                                 pkts_ignored++;
1067 #endif
1068                                 continue;
1069                         }
1070
1071                         if (pending_head->flags) {
1072 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1073                                 my_log(LOG_ERR,
1074 # if ((DEBUG) & DEBUG_C)
1075                                         "%s %s %s", logbuf,
1076 # else
1077                                         "%s %s",
1078 # endif
1079                                         "pending queue full:", "packet lost");
1080 #endif
1081 #if ((DEBUG) & DEBUG_I)
1082                                 pkts_lost_capture++;
1083 #endif
1084                                 goto done;
1085                         }
1086
1087 #if ((DEBUG) & DEBUG_I)
1088                         pkts_total++;
1089 #endif
1090
1091                         flow = pending_head;
1092
1093                         /* ?FIXME? Add sanity check for ip_len? */
1094                         flow->size = ntohs(nl->ip_len);
1095 #if ((DEBUG) & DEBUG_I)
1096                         size_total += flow->size;
1097 #endif
1098
1099                         flow->sip = nl->ip_src;
1100                         flow->dip = nl->ip_dst;
1101                         if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1102                                 my_log(LOG_INFO, "Received test flow to corewars.org");
1103                         }
1104                         flow->iif = snmp_index(ulog_msg->indev_name);
1105                         flow->oif = snmp_index(ulog_msg->outdev_name);
1106                         flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1107                         flow->proto = nl->ip_p;
1108                         flow->id = 0;
1109                         flow->tcp_flags = 0;
1110                         flow->pkts = 1;
1111                         flow->sizeF = 0;
1112                         flow->sizeP = 0;
1113                         /* Packets captured from OUTPUT table didn't contains valid timestamp */
1114                         if (ulog_msg->timestamp_sec) {
1115                                 flow->ctime.sec = ulog_msg->timestamp_sec;
1116                                 flow->ctime.usec = ulog_msg->timestamp_usec;
1117                         } else gettime(&flow->ctime);
1118                         flow->mtime = flow->ctime;
1119
1120                         off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1121
1122                         /*
1123                         Offset (from network layer) to transport layer header/IP data
1124                         IOW IP header size ;-)
1125
1126                         ?FIXME?
1127                         Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1128                         */
1129                         off_tl = nl->ip_hl << 2;
1130                         tl = (void *) nl + off_tl;
1131
1132                         /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1133                         flow->sizeF = ntohs(nl->ip_len) - off_tl;
1134                         psize -= off_tl;
1135                         if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1136                         if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1137
1138                         if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1139                                 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1140 #if ((DEBUG) & DEBUG_C)
1141                                 strcat(logbuf, " F");
1142 #endif
1143 #if ((DEBUG) & DEBUG_I)
1144                                 pkts_total_fragmented++;
1145 #endif
1146                                 flow->flags |= FLOW_FRAG;
1147                                 flow->id = nl->ip_id;
1148
1149                                 if (!(ntohs(nl->ip_off) & IP_MF)) {
1150                                         /* Packet whith IP_MF contains information about whole datagram size */
1151                                         flow->flags |= FLOW_LASTFRAG;
1152                                         /* size = frag_offset*8 + data_size */
1153                                         flow->sizeP = off_frag + flow->sizeF;
1154                                 }
1155                         }
1156
1157 #if ((DEBUG) & DEBUG_C)
1158                         sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1159                         strcat(logbuf, buf);
1160                         sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1161                         strcat(logbuf, buf);
1162 #endif
1163
1164                         /*
1165                         Fortunately most interesting transport layer information fit
1166                         into first 8 bytes of IP data field (minimal nonzero size).
1167                         Thus we don't need actual packet reassembling to build whole
1168                         transport layer data. We only check the fragment offset for
1169                         zero value to find packet with this information.
1170                         */
1171                         if (!off_frag && psize >= 8) {
1172                                 switch (flow->proto) {
1173                                         case IPPROTO_TCP:
1174                                         case IPPROTO_UDP:
1175                                                 flow->sp = ((struct udphdr *)tl)->uh_sport;
1176                                                 flow->dp = ((struct udphdr *)tl)->uh_dport;
1177                                                 goto tl_known;
1178
1179 #ifdef ICMP_TRICK
1180                                         case IPPROTO_ICMP:
1181                                                 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1182                                                 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1183                                                 goto tl_known;
1184 #endif
1185 #ifdef ICMP_TRICK_CISCO
1186                                         case IPPROTO_ICMP:
1187                                                 flow->dp = *((int32_t *) tl);
1188                                                 goto tl_known;
1189 #endif
1190
1191                                         default:
1192                                                 /* Unknown transport layer */
1193 #if ((DEBUG) & DEBUG_C)
1194                                                 strcat(logbuf, " U");
1195 #endif
1196                                                 flow->sp = 0;
1197                                                 flow->dp = 0;
1198                                                 break;
1199
1200                                         tl_known:
1201 #if ((DEBUG) & DEBUG_C)
1202                                                 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1203                                                 strcat(logbuf, buf);
1204 #endif
1205                                                 flow->flags |= FLOW_TL;
1206                                 }
1207                         }
1208
1209                         /* Check for tcp flags presence (including CWR and ECE). */
1210                         if (flow->proto == IPPROTO_TCP
1211                                 && off_frag < 16
1212                                 && psize >= 16 - off_frag) {
1213                                 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1214 #if ((DEBUG) & DEBUG_C)
1215                                 sprintf(buf, " TCP:%x", flow->tcp_flags);
1216                                 strcat(logbuf, buf);
1217 #endif
1218                         }
1219
1220 #if ((DEBUG) & DEBUG_C)
1221                         sprintf(buf, " => %x", (unsigned) flow);
1222                         strcat(logbuf, buf);
1223                         my_log(LOG_DEBUG, "%s", logbuf);
1224 #endif
1225
1226 #if ((DEBUG) & DEBUG_I)
1227                         pkts_pending++;
1228                         pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1229                         if (pending_queue_trace < pending_queue_trace_candidate)
1230                                 pending_queue_trace = pending_queue_trace_candidate;
1231 #endif
1232
1233                         /* Flow complete - inform unpending_thread() about it */
1234                         pending_head->flags |= FLOW_PENDING;
1235                         pending_head = pending_head->next;
1236                 done:
1237                         pthread_cond_signal(&unpending_cond);
1238                 }
1239         }
1240         return 0;
1241 }
1242
1243 int main(int argc, char **argv)
1244 {
1245         char errpbuf[512];
1246         char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1247         int c, i, write_fd, memory_limit = 0;
1248         struct addrinfo hints, *res;
1249         struct sockaddr_in saddr;
1250         pthread_attr_t tattr;
1251         struct sigaction sigact;
1252         static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1253         struct timeval timeout;
1254
1255         sched_min = sched_get_priority_min(SCHED);
1256         sched_max = sched_get_priority_max(SCHED);
1257
1258         memset(&saddr, 0 , sizeof(saddr));
1259         memset(&hints, 0 , sizeof(hints));
1260         hints.ai_flags = AI_PASSIVE;
1261         hints.ai_family = AF_INET;
1262         hints.ai_socktype = SOCK_DGRAM;
1263
1264         /* Process command line options */
1265
1266         opterr = 0;
1267         while ((c = my_getopt(argc, argv, parms)) != -1) {
1268                 switch (c) {
1269                         case '?':
1270                                 usage();
1271
1272                         case 'h':
1273                                 usage();
1274                 }
1275         }
1276
1277         if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1278         if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1279         if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1280         if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1281         if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1282         if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1283         if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1284         if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1285         if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1286         if (parms[nflag].count) {
1287                 switch (atoi(parms[nflag].arg)) {
1288                         case 1:
1289                                 netflow = &NetFlow1;
1290                                 break;
1291
1292                         case 5:
1293                                 break;
1294
1295                         case 7:
1296                                 netflow = &NetFlow7;
1297                                 break;
1298
1299                         default:
1300                                 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1301                                 exit(1);
1302                 }
1303         }
1304         if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1305         if (parms[lflag].count) {
1306                 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1307                         *log_suffix++ = 0;
1308                         if (*log_suffix) {
1309                                 sprintf(errpbuf, "[%s]", log_suffix);
1310                                 strcat(ident, errpbuf);
1311                         }
1312                 }
1313                 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1314                 if (log_suffix) *--log_suffix = ':';
1315         }
1316         if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1317         err_malloc:
1318                 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1319                 exit(1);
1320         }
1321         sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1322         if (parms[qflag].count) {
1323                 pending_queue_length = atoi(parms[qflag].arg);
1324                 if (pending_queue_length < 1) {
1325                         fprintf(stderr, "Illegal %s\n", "pending queue length");
1326                         exit(1);
1327                 }
1328         }
1329         if (parms[rflag].count) {
1330                 schedp.sched_priority = atoi(parms[rflag].arg);
1331                 if (schedp.sched_priority
1332                         && (schedp.sched_priority < sched_min
1333                                 || schedp.sched_priority > sched_max)) {
1334                         fprintf(stderr, "Illegal %s\n", "realtime priority");
1335                         exit(1);
1336                 }
1337         }
1338         if (parms[Bflag].count) {
1339                 sockbufsize = atoi(parms[Bflag].arg) << 10;
1340         }
1341         if (parms[bflag].count) {
1342                 bulk_quantity = atoi(parms[bflag].arg);
1343                 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1344                         fprintf(stderr, "Illegal %s\n", "bulk size");
1345                         exit(1);
1346                 }
1347         }
1348         if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1349         if (parms[Xflag].count) {
1350                 for(i = 0; parms[Xflag].arg[i]; i++)
1351                         if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1352                 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1353                         goto err_malloc;
1354                 rule = strtok(parms[Xflag].arg, ":");
1355                 for (i = 0; rule; i++) {
1356                         snmp_rules[i].len = strlen(rule);
1357                         if (snmp_rules[i].len > IFNAMSIZ) {
1358                                 fprintf(stderr, "Illegal %s\n", "interface basename");
1359                                 exit(1);
1360                         }
1361                         strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1362                         if (!*(rule - 1)) *(rule - 1) = ',';
1363                         rule = strtok(NULL, ",");
1364                         if (!rule) {
1365                                 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1366                                 exit(1);
1367                         }
1368                         snmp_rules[i].base = atoi(rule);
1369                         *(rule - 1) = ':';
1370                         rule = strtok(NULL, ":");
1371                 }
1372                 nsnmp_rules = i;
1373         }
1374         if (parms[tflag].count)
1375                 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1376         if (parms[aflag].count) {
1377                 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1378                 bad_lhost:
1379                         fprintf(stderr, "Illegal %s\n", "source address");
1380                         exit(1);
1381                 } else {
1382                         saddr = *((struct sockaddr_in *) res->ai_addr);
1383                         freeaddrinfo(res);
1384                 }
1385         }
1386         if (parms[uflag].count) 
1387                 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1388                         fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1389                         exit(1);
1390                 }
1391
1392
1393         /* Process collectors parameters. Brrrr... :-[ */
1394
1395         npeers = argc - optind;
1396         if (npeers > 1) {
1397                 /* Send to remote Netflow collector */
1398                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1399                 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1400                         dhost = argv[i];
1401                         if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1402                         *dport++ = 0;
1403                         if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1404                                 fprintf(stderr, "socket(): %s\n", strerror(errno));
1405                                 exit(1);
1406                         }
1407                         peers[npeers].write_fd = write_fd;
1408                         peers[npeers].type = PEER_MIRROR;
1409                         peers[npeers].laddr = saddr;
1410                         peers[npeers].seq = 0;
1411                         if ((lhost = strchr(dport, '/'))) {
1412                                 *lhost++ = 0;
1413                                 if ((type = strchr(lhost, '/'))) {
1414                                         *type++ = 0;
1415                                         switch (*type) {
1416                                                 case 0:
1417                                                 case 'm':
1418                                                         break;
1419
1420                                                 case 'r':
1421                                                         peers[npeers].type = PEER_ROTATE;
1422                                                         npeers_rot++;
1423                                                         break;
1424
1425                                                 default:
1426                                                         goto bad_collector;
1427                                         }
1428                                 }
1429                                 if (*lhost) {
1430                                         if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1431                                         peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1432                                         freeaddrinfo(res);
1433                                 }
1434                         }
1435                         if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1436                                                 sizeof(struct sockaddr_in))) {
1437                                 fprintf(stderr, "bind(): %s\n", strerror(errno));
1438                                 exit(1);
1439                         }
1440                         if (getaddrinfo(dhost, dport, &hints, &res)) {
1441 bad_collector:
1442                                 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1443                                 exit(1);
1444                         }
1445                         peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1446                         freeaddrinfo(res);
1447                         if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1448                                                 sizeof(struct sockaddr_in))) {
1449                                 fprintf(stderr, "connect(): %s\n", strerror(errno));
1450                                 exit(1);
1451                         }
1452
1453                         /* Restore command line */
1454                         if (type) *--type = '/';
1455                         if (lhost) *--lhost = '/';
1456                         *--dport = ':';
1457                 }
1458         }
1459         else if (parms[fflag].count) {
1460                 // log into a file
1461                 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1462                 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1463                 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1464                 
1465                 peers[npeers].write_fd = START_VALUE;
1466                 peers[npeers].type = PEER_FILE;
1467                 peers[npeers].seq = 0;
1468
1469                 get_cur_epoch();
1470                 npeers++;
1471         }
1472         else 
1473                 usage();
1474
1475
1476         if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1477         ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1478         if (!ulog_handle) {
1479                 fprintf(stderr, "libipulog initialization error: %s",
1480                         ipulog_strerror(ipulog_errno));
1481                 exit(1);
1482         }
1483         if (sockbufsize)
1484                 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1485                         &sockbufsize, sizeof(sockbufsize)) < 0)
1486                         fprintf(stderr, "setsockopt(): %s", strerror(errno));
1487
1488         /* Daemonize (if log destination stdout-free) */
1489
1490         my_log_open(ident, verbosity, log_dest);
1491         if (!(log_dest & 2)) {
1492                 /* Crash-proofing - Sapan*/
1493                 while (1) {
1494                         int pid=fork();
1495                         if (pid==-1) {
1496                                         fprintf(stderr, "fork(): %s", strerror(errno));
1497                                         exit(1);
1498                         }
1499                         else if (pid==0) {
1500                                         setsid();
1501                                         freopen("/dev/null", "r", stdin);
1502                                         freopen("/dev/null", "w", stdout);
1503                                         freopen("/dev/null", "w", stderr);
1504                                         break;
1505                         }
1506                         else {
1507                                 while (wait3(NULL,0,NULL) < 1);
1508                         }
1509                 }
1510         } else {
1511                 setvbuf(stdout, (char *)0, _IONBF, 0);
1512                 setvbuf(stderr, (char *)0, _IONBF, 0);
1513         }
1514
1515         pid = getpid();
1516         sprintf(errpbuf, "[%ld]", (long) pid);
1517         strcat(ident, errpbuf);
1518
1519         /* Initialization */
1520
1521         hash_init(); /* Actually for crc16 only */
1522         mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1523         for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1524
1525 #ifdef UPTIME_TRICK
1526         /* Hope 12 days is enough :-/ */
1527         start_time_offset = 1 << 20;
1528
1529         /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1530 #endif
1531         gettime(&start_time);
1532
1533         /*
1534         Build static pending queue as circular buffer.
1535         */
1536         if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1537         pending_tail = pending_head;
1538         for (i = pending_queue_length - 1; i--;) {
1539                 if (!(pending_tail->next = mem_alloc())) {
1540                 err_mem_alloc:
1541                         my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1542                         exit(1);
1543                 }
1544                 pending_tail = pending_tail->next;
1545         }
1546         pending_tail->next = pending_head;
1547         pending_tail = pending_head;
1548
1549         sigemptyset(&sig_mask);
1550         sigact.sa_handler = &sighandler;
1551         sigact.sa_mask = sig_mask;
1552         sigact.sa_flags = 0;
1553         sigaddset(&sig_mask, SIGTERM);
1554         sigaction(SIGTERM, &sigact, 0);
1555 #if ((DEBUG) & DEBUG_I)
1556         sigaddset(&sig_mask, SIGUSR1);
1557         sigaction(SIGUSR1, &sigact, 0);
1558 #endif
1559         if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1560                 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1561                 exit(1);
1562         }
1563
1564         my_log(LOG_INFO, "Starting %s...", VERSION);
1565
1566         if (parms[cflag].count) {
1567                 if (chdir(parms[cflag].arg) || chroot(".")) {
1568                         my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1569                         exit(1);
1570                 }
1571         }
1572
1573         schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1574         pthread_attr_init(&tattr);
1575         for (i = 0; i < THREADS - 1; i++) {
1576                 if (schedp.sched_priority > 0) {
1577                         if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1578                                 (pthread_attr_setschedparam(&tattr, &schedp))) {
1579                                 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1580                                 exit(1);
1581                         }
1582                 }
1583                 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1584                         my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1585                         exit(1);
1586                 }
1587                 pthread_detach(thid);
1588                 schedp.sched_priority++;
1589         }
1590
1591         if (pw) {
1592                 if (setgroups(0, NULL)) {
1593                         my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1594                         exit(1);
1595                 }
1596                 if (setregid(pw->pw_gid, pw->pw_gid)) {
1597                         my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1598                         exit(1);
1599                 }
1600                 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1601                         my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1602                         exit(1);
1603                 }
1604         }
1605
1606         if (!(pidfile = fopen(pidfilepath, "w")))
1607                 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1608         else {
1609                 fprintf(pidfile, "%ld\n", (long) pid);
1610                 fclose(pidfile);
1611         }
1612
1613         my_log(LOG_INFO, "pid: %d", pid);
1614         my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1615                 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1616                 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1617                 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1618                 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1619                 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1620                 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1621         for (i = 0; i < nsnmp_rules; i++) {
1622                 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1623                         i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1624         }
1625         for (i = 0; i < npeers; i++) {
1626                 switch (peers[i].type) {
1627                         case PEER_MIRROR:
1628                                 c = 'm';
1629                                 break;
1630                         case PEER_ROTATE:
1631                                 c = 'r';
1632                                 break;
1633                 }
1634                 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1635                 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1636                         inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1637         }
1638
1639         pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1640
1641         timeout.tv_usec = 0;
1642         while (!killed
1643                 || (total_elements - free_elements - pending_queue_length)
1644                 || emit_count
1645                 || pending_tail->flags) {
1646
1647                 if (!sigs) {
1648                         timeout.tv_sec = scan_interval;
1649                         select(0, 0, 0, 0, &timeout);
1650                 }
1651
1652                 if (sigs & SIGTERM_MASK && !killed) {
1653                         sigs &= ~SIGTERM_MASK;
1654                         my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1655                         scan_interval = 1;
1656                         frag_lifetime = -1;
1657                         active_lifetime = -1;
1658                         inactive_lifetime = -1;
1659                         emit_timeout = 1;
1660                         unpending_timeout = 1;
1661                         killed = 1;
1662                         pthread_cond_signal(&scan_cond);
1663                         pthread_cond_signal(&unpending_cond);
1664                 }
1665
1666 #if ((DEBUG) & DEBUG_I)
1667                 if (sigs & SIGUSR1_MASK) {
1668                         sigs &= ~SIGUSR1_MASK;
1669                         info_debug();
1670                 }
1671 #endif
1672         }
1673         remove(pidfilepath);
1674 #if ((DEBUG) & DEBUG_I)
1675         info_debug();
1676 #endif
1677         my_log(LOG_INFO, "Done.");
1678 #ifdef WALL
1679         return 0;
1680 #endif
1681 }