Bug fix.
authorsapanb <sapanb@8c455092-636d-4788-adf5-e71def0336e8>
Thu, 17 Apr 2008 05:49:49 +0000 (05:49 +0000)
committersapanb <sapanb@8c455092-636d-4788-adf5-e71def0336e8>
Thu, 17 Apr 2008 05:49:49 +0000 (05:49 +0000)
git-svn-id: http://svn.planet-lab.org/svn/fprobe-ulog/trunk@9044 8c455092-636d-4788-adf5-e71def0336e8

src/fprobe-ulog.c

index 1e77269..8216b64 100644 (file)
@@ -178,7 +178,7 @@ static int sockbufsize;
 #endif
 
 static unsigned epoch_length=60, log_epochs=1; 
-static unsigned cur_epoch=0,prev_uptime=0;
+static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
 
 static unsigned bulk_quantity = BULK_QUANTITY;
 static unsigned pending_queue_length = 100;
@@ -245,37 +245,37 @@ static struct passwd *pw = 0;
 void usage()
 {
        fprintf(stdout,
-               "fprobe-ulog: a NetFlow probe. Version %s\n"
-               "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
-               "\n"
-               "-h\t\tDisplay this help\n"
-               "-U <mask>\tULOG group bitwise mask [1]\n"
-               "-s <seconds>\tHow often scan for expired flows [5]\n"
-               "-g <seconds>\tFragmented flow lifetime [30]\n"
-               "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
-               "-f <filename>\tLog flow data in a file\n"
-               "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
-               "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
-               "-a <address>\tUse <address> as source for NetFlow flow\n"
-               "-X <rules>\tInterface name to SNMP-index conversion rules\n"
-               "-M\t\tUse netfilter mark value as ToS flag\n"
-               "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
-               "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
-               "-q <flows>\tPending queue length [100]\n"
-               "-B <kilobytes>\tKernel capture buffer size [0]\n"
-               "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
-               "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
-               "-c <directory>\tDirectory to chroot to\n"
-               "-u <user>\tUser to run as\n"
-               "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
-               "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
-               "-y <remote:port>\tAddress of the NetFlow collector\n"
-               "-f <writable file>\tFile to write data into\n"
-               "-T <n>\tRotate log file every n epochs\n"
-               "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
-               "-E <[1..60]>\tSize of an epoch in minutes\n"
-               "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
-               ,
+                       "fprobe-ulog: a NetFlow probe. Version %s\n"
+                       "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
+                       "\n"
+                       "-h\t\tDisplay this help\n"
+                       "-U <mask>\tULOG group bitwise mask [1]\n"
+                       "-s <seconds>\tHow often scan for expired flows [5]\n"
+                       "-g <seconds>\tFragmented flow lifetime [30]\n"
+                       "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
+                       "-f <filename>\tLog flow data in a file\n"
+                       "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
+                       "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
+                       "-a <address>\tUse <address> as source for NetFlow flow\n"
+                       "-X <rules>\tInterface name to SNMP-index conversion rules\n"
+                       "-M\t\tUse netfilter mark value as ToS flag\n"
+                       "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
+                       "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
+                       "-q <flows>\tPending queue length [100]\n"
+                       "-B <kilobytes>\tKernel capture buffer size [0]\n"
+                       "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
+                       "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
+                       "-c <directory>\tDirectory to chroot to\n"
+                       "-u <user>\tUser to run as\n"
+                       "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
+                       "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
+                       "-y <remote:port>\tAddress of the NetFlow collector\n"
+                       "-f <writable file>\tFile to write data into\n"
+                       "-T <n>\tRotate log file every n epochs\n"
+                       "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
+                       "-E <[1..60]>\tSize of an epoch in minutes\n"
+                       "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
+                       ,
                VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
        exit(0);
 }
@@ -284,14 +284,14 @@ void usage()
 void info_debug()
 {
        my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
-               pkts_total, pkts_total_fragmented, size_total,
-               pkts_pending - pkts_pending_done, pending_queue_trace);
+                       pkts_total, pkts_total_fragmented, size_total,
+                       pkts_pending - pkts_pending_done, pending_queue_trace);
        my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
-               pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
+                       pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
        my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
-               flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
+                       flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
        my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
-               total_elements, free_elements, total_memory);
+                       total_elements, free_elements, total_memory);
 }
 #endif
 
@@ -433,15 +433,17 @@ unsigned get_log_fd(char *fname, int cur_fd) {
                        my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
                }
                else {
-                       if (min_free && (statfs.f_bavail < min_free)) 
-                               switch(cur_epoch) {
-                                       case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
-                                               my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
-                                               exit(1);
-                                       default:
-                                               my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
-                                               cur_epoch = -1;
-                               }
+                       if (min_free && (statfs.f_bavail < min_free)
+                                       && (cur_epoch==last_peak))
+                       {
+                               my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
+                               cur_epoch = -1;
+                       }
+                       /*
+                          else 
+                          assume that we can reclaim space by overwriting our own files
+                          and that the difference in size will not fill the disk - sapan
+                          */
                }
        }
 
@@ -451,6 +453,7 @@ unsigned get_log_fd(char *fname, int cur_fd) {
                int write_fd;
                prev_uptime = cur_uptime;
                cur_epoch = (cur_epoch + 1) % log_epochs;
+               last_peak = cur_epoch;
                if (cur_fd>0)
                        close(cur_fd);
                snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
@@ -479,13 +482,13 @@ struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
 
        while (where) {
                if (where->sip.s_addr == what->sip.s_addr
-                       && where->dip.s_addr == what->dip.s_addr
-                       && where->proto == what->proto) {
+                               && where->dip.s_addr == what->dip.s_addr
+                               && where->proto == what->proto) {
                        switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
                                case 0:
                                        /* Both unfragmented */
                                        if ((what->sp == where->sp)
-                                               && (what->dp == where->dp)) goto done;
+                                                       && (what->dp == where->dp)) goto done;
                                        break;
                                case 2:
                                        /* Both fragmented */
@@ -501,11 +504,11 @@ done:
        return where;
 }
 
-int put_into(struct Flow *flow, int flag
+       int put_into(struct Flow *flow, int flag
 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
-       , char *logbuf
+                       , char *logbuf
 #endif
-)
+                   )
 {
        int ret = 0;
        hash_t h;
@@ -530,7 +533,7 @@ int put_into(struct Flow *flow, int flag
                        } else {
 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
                                my_log(LOG_ERR, "%s %s. %s",
-                                       "mem_alloc():", strerror(errno), "packet lost");
+                                               "mem_alloc():", strerror(errno), "packet lost");
 #endif
                                return -1;
                        }
@@ -564,23 +567,23 @@ int put_into(struct Flow *flow, int flag
                        /* Fragmented flow require some additional work */
                        if (flow->flags & FLOW_TL) {
                                /*
-                               ?FIXME?
-                               Several packets with FLOW_TL (attack)
-                               */
+                                  ?FIXME?
+                                  Several packets with FLOW_TL (attack)
+                                  */
                                flown->sp = flow->sp;
                                flown->dp = flow->dp;
                        }
                        if (flow->flags & FLOW_LASTFRAG) {
                                /*
-                               ?FIXME?
-                               Several packets with FLOW_LASTFRAG (attack)
-                               */
+                                  ?FIXME?
+                                  Several packets with FLOW_LASTFRAG (attack)
+                                  */
                                flown->sizeP = flow->sizeP;
                        }
                        flown->flags |= flow->flags;
                        flown->sizeF += flow->sizeF;
                        if ((flown->flags & FLOW_LASTFRAG)
-                               && (flown->sizeF >= flown->sizeP)) {
+                                       && (flown->sizeF >= flown->sizeP)) {
                                /* All fragments received - flow reassembled */
                                *flowpp = flown->next;
                                pthread_mutex_unlock(&flows_mutex[h]);
@@ -597,7 +600,7 @@ int put_into(struct Flow *flow, int flag
 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
                                                , logbuf
 #endif
-                                       );
+                                             );
                        }
                }
                if (flag == MOVE_INTO) mem_free(flow);
@@ -715,7 +718,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                                break;
 
                        case NETFLOW_PAD8:
-                       /* Unsupported (uint8_t) */
+                               /* Unsupported (uint8_t) */
                        case NETFLOW_ENGINE_TYPE:
                        case NETFLOW_ENGINE_ID:
                        case NETFLOW_FLAGS7_1:
@@ -729,7 +732,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                                p += NETFLOW_XID_SIZE;
                                break;
                        case NETFLOW_PAD16:
-                       /* Unsupported (uint16_t) */
+                               /* Unsupported (uint16_t) */
                        case NETFLOW_SRC_AS:
                        case NETFLOW_DST_AS:
                        case NETFLOW_FLAGS7_2:
@@ -738,7 +741,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                                break;
 
                        case NETFLOW_PAD32:
-                       /* Unsupported (uint32_t) */
+                               /* Unsupported (uint32_t) */
                        case NETFLOW_IPV4_NEXT_HOP:
                        case NETFLOW_ROUTER_SC:
                                *((uint32_t *) p) = 0;
@@ -747,7 +750,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
                        default:
                                my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
-                                       format, i, format[i]);
+                                               format, i, format[i]);
                                exit(1);
                }
        }
@@ -759,9 +762,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
 void setuser() {
        /*
-       Workaround for clone()-based threads
-       Try to change EUID independently of main thread
-       */
+          Workaround for clone()-based threads
+          Try to change EUID independently of main thread
+          */
        if (pw) {
                setgroups(0, NULL);
                setregid(pw->pw_gid, pw->pw_gid);
@@ -814,7 +817,7 @@ void *emit_thread()
                fflush(stdout);
 #endif
                if (emit_count == netflow->MaxFlows) {
-               sendit:
+sendit:
                        gettime(&emit_time);
                        p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
                        size = netflow->HeaderSize + emit_count * netflow->FlowSize;
@@ -825,73 +828,73 @@ void *emit_thread()
                        peer_rot_cur = 0;
                        for (i = 0; i < npeers; i++) {
                                if (peers[i].type == PEER_FILE) {
-                                               if (netflow->SeqOffset)
-                                                       *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
-                                               peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
-                                               ret = write(peers[i].write_fd, emit_packet, size);
-                                               if (ret < size) {
+                                       if (netflow->SeqOffset)
+                                               *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
+                                       peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
+                                       ret = write(peers[i].write_fd, emit_packet, size);
+                                       if (ret < size) {
 
 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
-                                                       my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
+                                               my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
                                                                i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
 #endif
 #undef MESSAGES
-                                               }
+                                       }
 #if ((DEBUG) & DEBUG_E)
-                                               commaneelse {
-                                                       my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+                                       commaneelse {
+                                               my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
                                                                emit_count, i + 1, peers[i].seq);
-                                               }
+                                       }
 #endif
-                                               peers[i].seq += emit_count;
-
-                                               /* Rate limit */
-                                               if (emit_rate_bytes) {
-                                                       sent += size;
-                                                       delay = sent / emit_rate_bytes;
-                                                       if (delay) {
-                                                               sent %= emit_rate_bytes;
-                                                               timeout.tv_sec = 0;
-                                                               timeout.tv_nsec = emit_rate_delay * delay;
-                                                               while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
-                                                       }
+                                       peers[i].seq += emit_count;
+
+                                       /* Rate limit */
+                                       if (emit_rate_bytes) {
+                                               sent += size;
+                                               delay = sent / emit_rate_bytes;
+                                               if (delay) {
+                                                       sent %= emit_rate_bytes;
+                                                       timeout.tv_sec = 0;
+                                                       timeout.tv_nsec = emit_rate_delay * delay;
+                                                       while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
                                                }
                                        }
+                               }
                                else
-                               if (peers[i].type == PEER_MIRROR) goto sendreal;
-                               else
-                               if (peers[i].type == PEER_ROTATE) 
-                                       if (peer_rot_cur++ == peer_rot_work) {
-                                       sendreal:
-                                               if (netflow->SeqOffset)
-                                                       *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
-                                               ret = send(peers[i].write_fd, emit_packet, size, 0);
-                                               if (ret < size) {
+                                       if (peers[i].type == PEER_MIRROR) goto sendreal;
+                                       else
+                                               if (peers[i].type == PEER_ROTATE) 
+                                                       if (peer_rot_cur++ == peer_rot_work) {
+sendreal:
+                                                               if (netflow->SeqOffset)
+                                                                       *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
+                                                               ret = send(peers[i].write_fd, emit_packet, size, 0);
+                                                               if (ret < size) {
 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
-                                                       my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
-                                                               i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
+                                                                       my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
+                                                                                       i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
 #endif
-                                               }
+                                                               }
 #if ((DEBUG) & DEBUG_E)
-                                               commaneelse {
-                                                       my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
-                                                               emit_count, i + 1, peers[i].seq);
-                                               }
+                                                               commaneelse {
+                                                                       my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+                                                                                       emit_count, i + 1, peers[i].seq);
+                                                               }
 #endif
-                                               peers[i].seq += emit_count;
-
-                                               /* Rate limit */
-                                               if (emit_rate_bytes) {
-                                                       sent += size;
-                                                       delay = sent / emit_rate_bytes;
-                                                       if (delay) {
-                                                               sent %= emit_rate_bytes;
-                                                               timeout.tv_sec = 0;
-                                                               timeout.tv_nsec = emit_rate_delay * delay;
-                                                               while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+                                                               peers[i].seq += emit_count;
+
+                                                               /* Rate limit */
+                                                               if (emit_rate_bytes) {
+                                                                       sent += size;
+                                                                       delay = sent / emit_rate_bytes;
+                                                                       if (delay) {
+                                                                               sent %= emit_rate_bytes;
+                                                                               timeout.tv_sec = 0;
+                                                                               timeout.tv_nsec = emit_rate_delay * delay;
+                                                                               while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+                                                                       }
+                                                               }
                                                        }
-                                               }
-                                       }
                        }
                        if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
                        emit_sequence += emit_count;
@@ -928,9 +931,9 @@ void *unpending_thread()
 #endif
                if (put_into(pending_tail, COPY_INTO
 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
-                               , logbuf
+                                       , logbuf
 #endif
-                       ) < 0) {
+                           ) < 0) {
 #if ((DEBUG) & DEBUG_I)
                        pkts_lost_unpending++;
 #endif                         
@@ -996,7 +999,7 @@ void *scan_thread()
                                } else {
                                        /* Flow is not frgamented */
                                        if ((now.sec - flow->mtime.sec) > inactive_lifetime
-                                               || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
+                                                       || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
                                                /* Flow expired */
 #if ((DEBUG) & DEBUG_S)
                                                my_log(LOG_DEBUG, "S: E %x", flow);
@@ -1031,9 +1034,9 @@ void *scan_thread()
 #endif
                        put_into(flow, MOVE_INTO
 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
-                               , logbuf
+                                       , logbuf
 #endif
-                       );
+                               );
 #if ((DEBUG) & DEBUG_S)
                        my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
 #endif
@@ -1086,11 +1089,11 @@ void *cap_thread()
 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
                                my_log(LOG_ERR,
 # if ((DEBUG) & DEBUG_C)
-                                       "%s %s %s", logbuf,
+                                               "%s %s %s", logbuf,
 # else
-                                       "%s %s",
+                                               "%s %s",
 # endif
-                                       "pending queue full:", "packet lost");
+                                               "pending queue full:", "packet lost");
 #endif
 #if ((DEBUG) & DEBUG_I)
                                pkts_lost_capture++;
@@ -1134,12 +1137,12 @@ void *cap_thread()
                        off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
 
                        /*
-                       Offset (from network layer) to transport layer header/IP data
-                       IOW IP header size ;-)
+                          Offset (from network layer) to transport layer header/IP data
+                          IOW IP header size ;-)
 
-                       ?FIXME?
-                       Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
-                       */
+                          ?FIXME?
+                          Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
+                          */
                        off_tl = nl->ip_hl << 2;
                        tl = (void *) nl + off_tl;
 
@@ -1176,12 +1179,12 @@ void *cap_thread()
 #endif
 
                        /*
-                       Fortunately most interesting transport layer information fit
-                       into first 8 bytes of IP data field (minimal nonzero size).
-                       Thus we don't need actual packet reassembling to build whole
-                       transport layer data. We only check the fragment offset for
-                       zero value to find packet with this information.
-                       */
+                          Fortunately most interesting transport layer information fit
+                          into first 8 bytes of IP data field (minimal nonzero size).
+                          Thus we don't need actual packet reassembling to build whole
+                          transport layer data. We only check the fragment offset for
+                          zero value to find packet with this information.
+                          */
                        if (!off_frag && psize >= 8) {
                                switch (flow->proto) {
                                        case IPPROTO_TCP:
@@ -1211,7 +1214,7 @@ void *cap_thread()
                                                flow->dp = 0;
                                                break;
 
-                                       tl_known:
+tl_known:
 #if ((DEBUG) & DEBUG_C)
                                                sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
                                                strcat(logbuf, buf);
@@ -1222,8 +1225,8 @@ void *cap_thread()
 
                        /* Check for tcp flags presence (including CWR and ECE). */
                        if (flow->proto == IPPROTO_TCP
-                               && off_frag < 16
-                               && psize >= 16 - off_frag) {
+                                       && off_frag < 16
+                                       && psize >= 16 - off_frag) {
                                flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
 #if ((DEBUG) & DEBUG_C)
                                sprintf(buf, " TCP:%x", flow->tcp_flags);
@@ -1247,7 +1250,7 @@ void *cap_thread()
                        /* Flow complete - inform unpending_thread() about it */
                        pending_head->flags |= FLOW_PENDING;
                        pending_head = pending_head->next;
-               done:
+done:
                        pthread_cond_signal(&unpending_cond);
                }
        }
@@ -1257,39 +1260,39 @@ void *cap_thread()
 /* Copied out of CoDemux */
 
 static int init_daemon() {
-  pid_t pid;
-  FILE *pidfile;
-
-  pidfile = fopen(PIDFILE, "w");
-  if (pidfile == NULL) {
-    my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
-  }
-
-  if ((pid = fork()) < 0) {
-    fclose(pidfile);
-    my_log(LOG_ERR, "Could not fork!\n");
-    return(-1);
-  }
-  else if (pid != 0) {
-    /* i'm the parent, writing down the child pid  */
-    fprintf(pidfile, "%u\n", pid);
-    fclose(pidfile);
-    exit(0);
-  }
-
-  /* close the pid file */
-  fclose(pidfile);
-
-  /* routines for any daemon process
-     1. create a new session 
-     2. change directory to the root
-     3. change the file creation permission 
-  */
-  setsid();
-  chdir("/var/local/fprobe");
-  umask(0);
-
-  return(0);
+       pid_t pid;
+       FILE *pidfile;
+
+       pidfile = fopen(PIDFILE, "w");
+       if (pidfile == NULL) {
+               my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
+       }
+
+       if ((pid = fork()) < 0) {
+               fclose(pidfile);
+               my_log(LOG_ERR, "Could not fork!\n");
+               return(-1);
+       }
+       else if (pid != 0) {
+               /* i'm the parent, writing down the child pid  */
+               fprintf(pidfile, "%u\n", pid);
+               fclose(pidfile);
+               exit(0);
+       }
+
+       /* close the pid file */
+       fclose(pidfile);
+
+       /* routines for any daemon process
+          1. create a new session 
+          2. change directory to the root
+          3. change the file creation permission 
+          */
+       setsid();
+       chdir("/var/local/fprobe");
+       umask(0);
+
+       return(0);
 }
 
 int main(int argc, char **argv)
@@ -1366,7 +1369,7 @@ int main(int argc, char **argv)
                if (log_suffix) *--log_suffix = ':';
        }
        if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
-       err_malloc:
+err_malloc:
                fprintf(stderr, "malloc(): %s\n", strerror(errno));
                exit(1);
        }
@@ -1381,8 +1384,8 @@ int main(int argc, char **argv)
        if (parms[rflag].count) {
                schedp.sched_priority = atoi(parms[rflag].arg);
                if (schedp.sched_priority
-                       && (schedp.sched_priority < sched_min
-                               || schedp.sched_priority > sched_max)) {
+                               && (schedp.sched_priority < sched_min
+                                       || schedp.sched_priority > sched_max)) {
                        fprintf(stderr, "Illegal %s\n", "realtime priority");
                        exit(1);
                }
@@ -1427,7 +1430,7 @@ int main(int argc, char **argv)
                sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
        if (parms[aflag].count) {
                if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
-               bad_lhost:
+bad_lhost:
                        fprintf(stderr, "Illegal %s\n", "source address");
                        exit(1);
                } else {
@@ -1513,7 +1516,7 @@ bad_collector:
                if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
                if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
                strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
-               
+
                peers[npeers].write_fd = START_VALUE;
                peers[npeers].type = PEER_FILE;
                peers[npeers].seq = 0;
@@ -1529,12 +1532,12 @@ bad_collector:
        ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
        if (!ulog_handle) {
                fprintf(stderr, "libipulog initialization error: %s",
-                       ipulog_strerror(ipulog_errno));
+                               ipulog_strerror(ipulog_errno));
                exit(1);
        }
        if (sockbufsize)
                if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
-                       &sockbufsize, sizeof(sockbufsize)) < 0)
+                                       &sockbufsize, sizeof(sockbufsize)) < 0)
                        fprintf(stderr, "setsockopt(): %s", strerror(errno));
 
        /* Daemonize (if log destination stdout-free) */
@@ -1548,15 +1551,15 @@ bad_collector:
                while (1) {
                        int pid=fork();
                        if (pid==-1) {
-                                       fprintf(stderr, "fork(): %s", strerror(errno));
-                                       exit(1);
+                               fprintf(stderr, "fork(): %s", strerror(errno));
+                               exit(1);
                        }
                        else if (pid==0) {
-                                       setsid();
-                                       freopen("/dev/null", "r", stdin);
-                                       freopen("/dev/null", "w", stdout);
-                                       freopen("/dev/null", "w", stderr);
-                                       break;
+                               setsid();
+                               freopen("/dev/null", "r", stdin);
+                               freopen("/dev/null", "w", stdout);
+                               freopen("/dev/null", "w", stderr);
+                               break;
                        }
                        else {
                                while (wait3(NULL,0,NULL) < 1);
@@ -1586,13 +1589,13 @@ bad_collector:
        gettime(&start_time);
 
        /*
-       Build static pending queue as circular buffer.
-       */
+          Build static pending queue as circular buffer.
+          */
        if (!(pending_head = mem_alloc())) goto err_mem_alloc;
        pending_tail = pending_head;
        for (i = pending_queue_length - 1; i--;) {
                if (!(pending_tail->next = mem_alloc())) {
-               err_mem_alloc:
+err_mem_alloc:
                        my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
                        exit(1);
                }
@@ -1630,7 +1633,7 @@ bad_collector:
        for (i = 0; i < THREADS - 1; i++) {
                if (schedp.sched_priority > 0) {
                        if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
-                               (pthread_attr_setschedparam(&tattr, &schedp))) {
+                                       (pthread_attr_setschedparam(&tattr, &schedp))) {
                                my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
                                exit(1);
                        }
@@ -1667,15 +1670,15 @@ bad_collector:
 
        my_log(LOG_INFO, "pid: %d", pid);
        my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
-               "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
-               ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
-               netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
-               memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
-               emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
-               parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
+                       "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
+                       ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
+                       netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
+                       memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
+                       emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
+                       parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
        for (i = 0; i < nsnmp_rules; i++) {
                my_log(LOG_INFO, "SNMP rule #%d %s:%d",
-                       i + 1, snmp_rules[i].basename, snmp_rules[i].base);
+                               i + 1, snmp_rules[i].basename, snmp_rules[i].base);
        }
        for (i = 0; i < npeers; i++) {
                switch (peers[i].type) {
@@ -1688,16 +1691,16 @@ bad_collector:
                }
                snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
                my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
-                       inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
+                               inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
        }
 
        pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
 
        timeout.tv_usec = 0;
        while (!killed
-               || (total_elements - free_elements - pending_queue_length)
-               || emit_count
-               || pending_tail->flags) {
+                       || (total_elements - free_elements - pending_queue_length)
+                       || emit_count
+                       || pending_tail->flags) {
 
                if (!sigs) {
                        timeout.tv_sec = scan_interval;