From 466dcb566b08ec31ac3eb9946ec986b9e9f8c9c1 Mon Sep 17 00:00:00 2001 From: sapanb Date: Fri, 8 Jul 2011 14:55:39 +0000 Subject: [PATCH] Make fprobe compress the collected flow logs. git-svn-id: http://svn.planet-lab.org/svn/fprobe-ulog/trunk@18792 8c455092-636d-4788-adf5-e71def0336e8 --- src/fprobe-ulog.c | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/fprobe-ulog.c b/src/fprobe-ulog.c index 0896cb5..bca3b5d 100644 --- a/src/fprobe-ulog.c +++ b/src/fprobe-ulog.c @@ -217,6 +217,7 @@ static int sched_min, sched_max; static int npeers, npeers_rot; static struct peer *peers; static int sigs; +static char cur_output_file[MAX_PATH_LEN]; static struct Flow *flows[1 << HASH_BITS]; static pthread_mutex_t flows_mutex[1 << HASH_BITS]; @@ -466,20 +467,24 @@ unsigned get_data_file_fd(char *fname, int cur_fd) { * or we're starting up * or we're going back to the first epoch */ if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) { - char nextname[MAX_PATH_LEN]; int write_fd; prev_uptime = cur_uptime; cur_epoch = (cur_epoch + 1) % log_epochs; if (cur_epoch>last_peak) last_peak = cur_epoch; - if (cur_fd>0) + if (cur_fd>0) { close(cur_fd); - snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); - if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) { - my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno)); + /* Compress the finished file */ + char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; + snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file); + system(gzip_cmd); + } + snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); + if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) { + my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno)); exit(1); } if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) { - my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno)); + my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno)); } update_cur_epoch_file(cur_epoch); ret_fd = write_fd; @@ -819,6 +824,8 @@ void *emit_thread() timeout.tv_nsec = 0; setuser(); + + //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT); for (;;) { pthread_mutex_lock(&emit_mutex); @@ -827,6 +834,7 @@ void *emit_thread() timeout.tv_sec = now.tv_sec + emit_timeout; /* Do not wait until emit_packet will filled - it may be too long */ if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) { + my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec); pthread_mutex_unlock(&emit_mutex); goto sendit; } @@ -876,7 +884,7 @@ sendit: #undef MESSAGES } #if ((DEBUG) & DEBUG_E) - commaneelse { + else { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } @@ -911,7 +919,7 @@ sendreal: #endif } #if ((DEBUG) & DEBUG_E) - commaneelse { + else { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } -- 2.43.0