Split up log file into epochs so that we can throw away old data.
authorsapanb <sapanb@8c455092-636d-4788-adf5-e71def0336e8>
Mon, 5 Nov 2007 04:38:58 +0000 (04:38 +0000)
committersapanb <sapanb@8c455092-636d-4788-adf5-e71def0336e8>
Mon, 5 Nov 2007 04:38:58 +0000 (04:38 +0000)
git-svn-id: http://svn.planet-lab.org/svn/fprobe-ulog/trunk@5623 8c455092-636d-4788-adf5-e71def0336e8

src/fprobe-ulog.c
src/fprobe-ulog.h

index cdcf170..91a9fd1 100644 (file)
@@ -90,6 +90,7 @@ enum {
        cflag,
        dflag,
        eflag,
+       Eflag,
        fflag,
        gflag,
        hflag,
@@ -101,6 +102,7 @@ enum {
        rflag,
        sflag,
        tflag,
+       Tflag,
        Uflag,
        uflag,
        vflag,
@@ -114,6 +116,7 @@ static struct getopt_parms parms[] = {
        {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'h', 0, 0, 0},
@@ -125,6 +128,7 @@ static struct getopt_parms parms[] = {
        {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
@@ -152,6 +156,10 @@ static int sockbufsize;
 #else
 #define BULK_QUANTITY 200
 #endif
+
+static unsigned epoch_length=60, log_epochs=1; 
+static unsigned cur_epoch=0,prev_uptime=0;
+
 static unsigned bulk_quantity = BULK_QUANTITY;
 static unsigned pending_queue_length = 100;
 static struct NetFlow *netflow = &NetFlow5;
@@ -243,6 +251,8 @@ void usage()
                "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
                "-y <remote:port>\tAddress of the NetFlow collector\n",
                "-f <writable file>\tFile to write data into\n"
+               "-T <n>\tRotate log file every n epochs\n"
+               "-E <[1..60]>\tSize of an epoch in minutes\n"
                VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
        exit(0);
 }
@@ -339,6 +349,30 @@ inline void copy_flow(struct Flow *src, struct Flow *dst)
        dst->flags = src->flags;
 }
 
+unsigned get_log_fd(char *fname, unsigned cur_fd) {
+       struct Time now;
+       unsigned cur_uptime;
+       int ret_fd;
+       gettime(&now);
+       cur_uptime = getuptime(&now);
+       if ((cur_uptime - prev_uptime) > (1000 * epoch_length)) {
+               char nextname[MAX_PATH_LEN];
+               int write_fd;
+               prev_uptime = cur_uptime;
+               cur_epoch = (cur_epoch + 1) % log_epochs;
+               close(cur_fd);
+               snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
+               if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
+                       fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
+                       exit(1);
+               }
+               ret_fd = write_fd;
+       }
+       else
+               ret_fd = cur_fd;
+       return(cur_fd);
+}
+
 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
 {
        struct Flow **flowpp;
@@ -692,6 +726,7 @@ void *emit_thread()
                                                if (netflow->SeqOffset)
                                                        *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
 #define MESSAGES
+                                               peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
                                                ret = write(peers[0].write_fd, emit_packet, size);
                                                if (ret < size) {
 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
@@ -1330,12 +1365,10 @@ bad_collector:
                // log into a file
                char *fname;
                if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
-               fname = parms[fflag].arg;
-               if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
-                       fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
-                       exit(1);
-               }
-               peers[0].write_fd = write_fd;
+               if (!(fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
+               strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
+               
+               peers[0].write_fd = -1;
                peers[0].type = PEER_FILE;
                peers[0].seq = 0;
                npeers++;
index 0d812c5..db76d23 100644 (file)
@@ -16,6 +16,9 @@
 #define IP_OFFMASK 0x1fff
 #endif
 
+#define MAX_PATH_LEN 255
+#define MAX_DIGIT 5
+
 #define SCHED SCHED_FIFO
 #define EMIT_TIMEOUT 5
 #define UNPENDING_TIMEOUT 5
@@ -111,6 +114,7 @@ struct Flow_TL {
 
 struct peer {
        int write_fd;
+       char *fname;
        struct sockaddr_in addr;
        struct sockaddr_in laddr;
        int type;