X-Git-Url: http://git.onelab.eu/?p=fprobe-ulog.git;a=blobdiff_plain;f=src%2Ffprobe-ulog.c;h=e939dabf7464fd6b7795c69913c563c9f4347f5f;hp=0896cb58d73ffa3455ad64d82366979bdf66878c;hb=ba0903e394f2aff6a29d19576f99dfab4c92ae69;hpb=f42d3718eaa93d9efb2cfcb7cb455d7b8f4bc775 diff --git a/src/fprobe-ulog.c b/src/fprobe-ulog.c index 0896cb5..e939dab 100644 --- a/src/fprobe-ulog.c +++ b/src/fprobe-ulog.c @@ -37,7 +37,7 @@ #include #include -#include "vserver.h" +/* #include "vserver.h" */ struct ipulog_handle { int fd; @@ -217,6 +217,7 @@ static int sched_min, sched_max; static int npeers, npeers_rot; static struct peer *peers; static int sigs; +static char cur_output_file[MAX_PATH_LEN]; static struct Flow *flows[1 << HASH_BITS]; static pthread_mutex_t flows_mutex[1 << HASH_BITS]; @@ -417,7 +418,7 @@ void update_cur_epoch_file(int n) { int fd, len; char snum[MAX_EPOCH_SIZE]; len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n); - fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC); + fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH); if (fd == -1) { my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE); return; @@ -466,20 +467,24 @@ unsigned get_data_file_fd(char *fname, int cur_fd) { * or we're starting up * or we're going back to the first epoch */ if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) { - char nextname[MAX_PATH_LEN]; int write_fd; prev_uptime = cur_uptime; cur_epoch = (cur_epoch + 1) % log_epochs; if (cur_epoch>last_peak) last_peak = cur_epoch; - if (cur_fd>0) + if (cur_fd>0) { close(cur_fd); - snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); - if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) { - my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno)); + /* Compress the finished file */ + char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; + snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file); + system(gzip_cmd); + } + snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); + if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)) < 0) { + my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno)); exit(1); } if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) { - my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno)); + my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno)); } update_cur_epoch_file(cur_epoch); ret_fd = write_fd; @@ -819,14 +824,18 @@ void *emit_thread() timeout.tv_nsec = 0; setuser(); - + for (;;) { pthread_mutex_lock(&emit_mutex); while (!flows_emit) { gettimeofday(&now, 0); timeout.tv_sec = now.tv_sec + emit_timeout; /* Do not wait until emit_packet will filled - it may be too long */ - if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) { + int res=-1; + while ((res=pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout))==-1) continue; + + if (res && emit_count) { + //my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec); pthread_mutex_unlock(&emit_mutex); goto sendit; } @@ -876,7 +885,7 @@ sendit: #undef MESSAGES } #if ((DEBUG) & DEBUG_E) - commaneelse { + else { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } @@ -911,7 +920,7 @@ sendreal: #endif } #if ((DEBUG) & DEBUG_E) - commaneelse { + else { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } @@ -958,7 +967,8 @@ void *unpending_thread() while (!(pending_tail->flags & FLOW_PENDING)) { gettimeofday(&now, 0); timeout.tv_sec = now.tv_sec + unpending_timeout; - pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout); + while (pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout)==-1) + continue; } #if ((DEBUG) & (DEBUG_S | DEBUG_U)) @@ -1004,7 +1014,8 @@ void *scan_thread() for (;;) { gettime(&now); timeout.tv_sec = now.sec + scan_interval; - pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout); + while (pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout)==-1) + continue; gettime(&now); #if ((DEBUG) & DEBUG_S) @@ -1156,14 +1167,10 @@ void *cap_thread() /* It's going to be expensive calling this syscall on every flow. * We should keep a local hash table, for now just bear the overhead... - Sapan*/ - flow->slice_id=0; - - if (ulog_msg->mark > 0) { - flow->slice_id = xid_to_slice_id(ulog_msg->mark); - } + flow->slice_id = ulog_msg->mark; - if (flow->slice_id < 1) - flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid + /*if (flow->slice_id < 1) + flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid*/ if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) { @@ -1498,7 +1505,7 @@ bad_lhost: /* Process collectors parameters. Brrrr... :-[ */ npeers = argc - optind; - if (npeers > 1) { + if (npeers >= 1) { /* Send to remote Netflow collector */ if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc; for (i = optind, npeers = 0; i < argc; i++, npeers++) { @@ -1563,10 +1570,8 @@ bad_collector: } else if (parms[fflag].count) { // log into a file - if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc; - if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc; - strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN); - + if (!(peers = malloc(sizeof(struct peer)))) goto err_malloc; + if (!(peers[npeers].fname = strndup(parms[fflag].arg, MAX_PATH_LEN))) goto err_malloc; peers[npeers].write_fd = START_DATA_FD; peers[npeers].type = PEER_FILE; peers[npeers].seq = 0; @@ -1626,7 +1631,7 @@ bad_collector: /* Initialization */ - init_slice_id_hash(); + // init_slice_id_hash(); hash_init(); /* Actually for crc16 only */ mem_init(sizeof(struct Flow), bulk_quantity, memory_limit); for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);