Make fprobe compress the collected flow logs.
authorsapanb <sapanb@8c455092-636d-4788-adf5-e71def0336e8>
Fri, 8 Jul 2011 14:55:39 +0000 (14:55 +0000)
committersapanb <sapanb@8c455092-636d-4788-adf5-e71def0336e8>
Fri, 8 Jul 2011 14:55:39 +0000 (14:55 +0000)
git-svn-id: http://svn.planet-lab.org/svn/fprobe-ulog/trunk@18792 8c455092-636d-4788-adf5-e71def0336e8

src/fprobe-ulog.c

index 0896cb5..bca3b5d 100644 (file)
@@ -217,6 +217,7 @@ static int sched_min, sched_max;
 static int npeers, npeers_rot;
 static struct peer *peers;
 static int sigs;
+static char cur_output_file[MAX_PATH_LEN];
 
 static struct Flow *flows[1 << HASH_BITS];
 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
@@ -466,20 +467,24 @@ unsigned get_data_file_fd(char *fname, int cur_fd) {
         * or we're starting up 
         * or we're going back to the first epoch */
        if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
-               char nextname[MAX_PATH_LEN];
                int write_fd;
                prev_uptime = cur_uptime;
                cur_epoch = (cur_epoch + 1) % log_epochs;
                if (cur_epoch>last_peak) last_peak = cur_epoch;
-               if (cur_fd>0)
+               if (cur_fd>0) {
                        close(cur_fd);
-               snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
-               if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
-                       my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
+            /* Compress the finished file */
+            char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; 
+            snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
+            system(gzip_cmd);
+        }
+               snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
+               if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
+                       my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
                        exit(1);
                }
                if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
-                       my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno));
+                       my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
                }
                update_cur_epoch_file(cur_epoch);
                ret_fd = write_fd;
@@ -819,6 +824,8 @@ void *emit_thread()
        timeout.tv_nsec = 0;
 
        setuser();
+        
+    //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT);
 
        for (;;) {
                pthread_mutex_lock(&emit_mutex);
@@ -827,6 +834,7 @@ void *emit_thread()
                        timeout.tv_sec = now.tv_sec + emit_timeout;
                        /* Do not wait until emit_packet will filled - it may be too long */
                        if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
+                my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
                                pthread_mutex_unlock(&emit_mutex);
                                goto sendit;
                        }
@@ -876,7 +884,7 @@ sendit:
 #undef MESSAGES
                                        }
 #if ((DEBUG) & DEBUG_E)
-                                       commaneelse {
+                                       else {
                                                my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
                                                                emit_count, i + 1, peers[i].seq);
                                        }
@@ -911,7 +919,7 @@ sendreal:
 #endif
                                                                }
 #if ((DEBUG) & DEBUG_E)
-                                                               commaneelse {
+                                                               else {
                                                                        my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
                                                                                        emit_count, i + 1, peers[i].seq);
                                                                }