Decrease granularity to reduce the size of the dataset
[fprobe-ulog.git] / src / fprobe-ulog.c
index 65529ae..8c66067 100644 (file)
@@ -5,6 +5,14 @@
        modify it under the terms of the GNU General Public License.
 
        $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
+
+                       Sapan Bhatia <sapanb@cs.princeton.edu> 
+                       
+       7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
+                       rotation.
+
+       15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
+
 */
 
 #include <common.h>
 /* sig*() */
 #include <signal.h>
 
+/* statfs() */
+
+#include <sys/vfs.h>
+
 #include <libipulog/libipulog.h>
 struct ipulog_handle {
        int fd;
@@ -83,12 +95,16 @@ struct ipulog_handle {
 #include <hash.h>
 #include <mem.h>
 
+#define PIDFILE "/var/log/fprobe-ulog.pid"
+#define STD_NETFLOW_PDU
+
 enum {
        aflag,
        Bflag,
        bflag,
        cflag,
        dflag,
+       Dflag,
        eflag,
        Eflag,
        fflag,
@@ -106,6 +122,7 @@ enum {
        Uflag,
        uflag,
        vflag,
+       Wflag,
        Xflag,
 };
 
@@ -115,6 +132,7 @@ static struct getopt_parms parms[] = {
        {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
@@ -144,8 +162,10 @@ extern struct NetFlow NetFlow1;
 extern struct NetFlow NetFlow5;
 extern struct NetFlow NetFlow7;
 
+#define START_VALUE -5
 #define mark_is_tos parms[Mflag].count
 static unsigned scan_interval = 5;
+static int min_free = 0;
 static int frag_lifetime = 30;
 static int inactive_lifetime = 60;
 static int active_lifetime = 300;
@@ -252,7 +272,10 @@ void usage()
                "-y <remote:port>\tAddress of the NetFlow collector\n"
                "-f <writable file>\tFile to write data into\n"
                "-T <n>\tRotate log file every n epochs\n"
-               "-E <[1..60]>\tSize of an epoch in minutes\n",
+               "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
+               "-E <[1..60]>\tSize of an epoch in minutes\n"
+               "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
+               ,
                VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
        exit(0);
 }
@@ -295,6 +318,12 @@ void gettime(struct Time *now)
        now->usec = t.tv_usec;
 }
 
+
+inline time_t cmpMtime(struct Time *t1, struct Time *t2)
+{
+       return (t1->sec - t2->sec)/60;
+}
+
 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
 {
        return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
@@ -307,6 +336,13 @@ uint32_t getuptime(struct Time *t)
        return cmpmtime(t, &start_time);
 }
 
+/* Uptime in minutes */
+uint32_t getuptime_minutes(struct Time *t)
+{
+       /* Maximum uptime is about 49/2 days */
+       return cmpMtime(t, &start_time);
+}
+
 hash_t hash_flow(struct Flow *flow)
 {
        if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
@@ -349,29 +385,85 @@ inline void copy_flow(struct Flow *src, struct Flow *dst)
        dst->flags = src->flags;
 }
 
-unsigned get_log_fd(char *fname, unsigned cur_fd) {
+void get_cur_epoch() {
+       int fd;
+       fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
+       if (fd != -1) {
+               char snum[7];
+               ssize_t len;
+               len = read(fd, snum, sizeof(snum)-1);
+               if (len != -1) {
+                       snum[len]='\0';
+                       sscanf(snum,"%d",&cur_epoch);
+                       cur_epoch++; /* Let's not stone the last epoch */
+                       close(fd);
+               }
+       }
+       return;
+}
+
+
+void update_cur_epoch_file(int n) {
+       int fd, len;
+       char snum[7];
+       len=snprintf(snum,6,"%d",n);
+       fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
+       if (fd == -1) {
+               my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
+               return;
+       }
+       write(fd, snum, len);
+       close(fd);
+}
+
+unsigned get_log_fd(char *fname, int cur_fd) {
        struct Time now;
        unsigned cur_uptime;
+       /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
+        * doesn't solve the problem */
+
+       struct statfs statfs;
        int ret_fd;
        gettime(&now);
-       cur_uptime = getuptime(&now);
+       cur_uptime = getuptime_minutes(&now);
+
 
-       /* Epoch lenght in minutes */
-       if ((cur_uptime - prev_uptime) > (1000 * 60 * epoch_length)) {
+       if (cur_fd!=START_VALUE) {
+               if (fstatfs(cur_fd, &statfs) == -1) {
+                       my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
+               }
+               else {
+                       if (min_free && (statfs.f_bavail < min_free)) 
+                               switch(cur_epoch) {
+                                       case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
+                                               my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
+                                               exit(1);
+                                       default:
+                                               my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
+                                               cur_epoch = -1;
+                               }
+               }
+       }
+
+       /* Epoch length in minutes */
+       if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
                char nextname[MAX_PATH_LEN];
                int write_fd;
                prev_uptime = cur_uptime;
                cur_epoch = (cur_epoch + 1) % log_epochs;
-               close(cur_fd);
+               if (cur_fd>0)
+                       close(cur_fd);
                snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
-               if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
-                       fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
+               if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
+                       my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
                        exit(1);
                }
+               update_cur_epoch_file(cur_epoch);
                ret_fd = write_fd;
        }
-       else
+       else {
                ret_fd = cur_fd;
+       }
        return(ret_fd);
 }
 
@@ -530,6 +622,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
                        case NETFLOW_IPV4_DST_ADDR:
                                ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
+                               if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+                                       my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
+                               }
                                p += NETFLOW_IPV4_DST_ADDR_SIZE;
                                break;
 
@@ -629,9 +724,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                                *((uint8_t *) p) = 0;
                                p += NETFLOW_PAD8_SIZE;
                                break;
-                       case NETFLOW_PLANETLAB_XID:
+                       case NETFLOW_XID:
                                *((uint16_t *) p) = flow->tos;
-                               p += NETFLOW_PLANETLAB_XID_SIZE;
+                               p += NETFLOW_XID_SIZE;
                                break;
                        case NETFLOW_PAD16:
                        /* Unsupported (uint16_t) */
@@ -724,16 +819,18 @@ void *emit_thread()
                        p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
                        size = netflow->HeaderSize + emit_count * netflow->FlowSize;
                        /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
+#ifdef STD_NETFLOW_PDU
                        if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
+#endif
                        peer_rot_cur = 0;
                        for (i = 0; i < npeers; i++) {
-                               if (peers[0].type == PEER_FILE) {
+                               if (peers[i].type == PEER_FILE) {
                                                if (netflow->SeqOffset)
                                                        *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
-#define MESSAGES
-                                               peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
-                                               ret = write(peers[0].write_fd, emit_packet, size);
+                                               peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
+                                               ret = write(peers[i].write_fd, emit_packet, size);
                                                if (ret < size) {
+
 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
                                                        my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
                                                                i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
@@ -746,7 +843,7 @@ void *emit_thread()
                                                                emit_count, i + 1, peers[i].seq);
                                                }
 #endif
-                                               peers[0].seq += emit_count;
+                                               peers[i].seq += emit_count;
 
                                                /* Rate limit */
                                                if (emit_rate_bytes) {
@@ -1015,6 +1112,9 @@ void *cap_thread()
 
                        flow->sip = nl->ip_src;
                        flow->dip = nl->ip_dst;
+                       if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+                               my_log(LOG_INFO, "Received test flow to corewars.org");
+                       }
                        flow->iif = snmp_index(ulog_msg->indev_name);
                        flow->oif = snmp_index(ulog_msg->outdev_name);
                        flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
@@ -1154,6 +1254,44 @@ void *cap_thread()
        return 0;
 }
 
+/* Copied out of CoDemux */
+
+static int init_daemon() {
+  pid_t pid;
+  FILE *pidfile;
+
+  pidfile = fopen(PIDFILE, "w");
+  if (pidfile == NULL) {
+    my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
+  }
+
+  if ((pid = fork()) < 0) {
+    fclose(pidfile);
+    my_log(LOG_ERR, "Could not fork!\n");
+    return(-1);
+  }
+  else if (pid != 0) {
+    /* i'm the parent, writing down the child pid  */
+    fprintf(pidfile, "%u\n", pid);
+    fclose(pidfile);
+    exit(0);
+  }
+
+  /* close the pid file */
+  fclose(pidfile);
+
+  /* routines for any daemon process
+     1. create a new session 
+     2. change directory to the root
+     3. change the file creation permission 
+  */
+  setsid();
+  chdir("/usr/local/fprobe");
+  umask(0);
+
+  return(0);
+}
+
 int main(int argc, char **argv)
 {
        char errpbuf[512];
@@ -1189,11 +1327,13 @@ int main(int argc, char **argv)
        }
 
        if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
+       if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
        if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
        if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
        if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
        if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
        if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
+       if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
        if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
        if (parms[nflag].count) {
                switch (atoi(parms[nflag].arg)) {
@@ -1371,12 +1511,14 @@ bad_collector:
        else if (parms[fflag].count) {
                // log into a file
                if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
-               if (!(peers[0].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
-               strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
+               if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
+               strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
                
-               peers[0].write_fd = -1;
-               peers[0].type = PEER_FILE;
-               peers[0].seq = 0;
+               peers[npeers].write_fd = START_VALUE;
+               peers[npeers].type = PEER_FILE;
+               peers[npeers].seq = 0;
+
+               get_cur_epoch();
                npeers++;
        }
        else 
@@ -1398,21 +1540,27 @@ bad_collector:
        /* Daemonize (if log destination stdout-free) */
 
        my_log_open(ident, verbosity, log_dest);
-       if (!(log_dest & 2)) {
-               switch (fork()) {
-                       case -1:
-                               fprintf(stderr, "fork(): %s", strerror(errno));
-                               exit(1);
 
-                       case 0:
-                               setsid();
-                               freopen("/dev/null", "r", stdin);
-                               freopen("/dev/null", "w", stdout);
-                               freopen("/dev/null", "w", stderr);
-                               break;
+       init_daemon();
 
-                       default:
-                               exit(0);
+       if (!(log_dest & 2)) {
+               /* Crash-proofing - Sapan*/
+               while (1) {
+                       int pid=fork();
+                       if (pid==-1) {
+                                       fprintf(stderr, "fork(): %s", strerror(errno));
+                                       exit(1);
+                       }
+                       else if (pid==0) {
+                                       setsid();
+                                       freopen("/dev/null", "r", stdin);
+                                       freopen("/dev/null", "w", stdout);
+                                       freopen("/dev/null", "w", stderr);
+                                       break;
+                       }
+                       else {
+                               while (wait3(NULL,0,NULL) < 1);
+                       }
                }
        } else {
                setvbuf(stdout, (char *)0, _IONBF, 0);