X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Ffprobe-ulog.c;h=3edf32af5af67719c880c5767a382b466354a29c;hb=6da4b31660254b81a2506756b1ca9e93e9411222;hp=0d84804c667195f230c38245b1af2d580b6c8ae6;hpb=ccbb7ed928d2eac2269f72abbc931b13060dfda7;p=fprobe-ulog.git diff --git a/src/fprobe-ulog.c b/src/fprobe-ulog.c index 0d84804..3edf32a 100644 --- a/src/fprobe-ulog.c +++ b/src/fprobe-ulog.c @@ -111,6 +111,7 @@ enum { Uflag, uflag, vflag, + Wflag, Xflag, }; @@ -257,7 +258,9 @@ void usage() "-y \tAddress of the NetFlow collector\n" "-f \tFile to write data into\n" "-T \tRotate log file every n epochs\n" - "-E <[1..60]>\tSize of an epoch in minutes\n", + "-W \tSet current epoch to n. Useful when restarting fprobe\n" + "-E <[1..60]>\tSize of an epoch in minutes\n" + , VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max); exit(0); } @@ -367,6 +370,19 @@ inline void copy_flow(struct Flow *src, struct Flow *dst) dst->flags = src->flags; } +void update_cur_epoch_file(int n) { + int fd, len; + char snum[7]; + len=snprintf(snum,6,"%d",n); + fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT); + if (fd == -1) { + my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch"); + return; + } + write(fd, snum, len); + close(fd); +} + unsigned get_log_fd(char *fname, unsigned cur_fd) { struct Time now; unsigned cur_uptime; @@ -382,10 +398,11 @@ unsigned get_log_fd(char *fname, unsigned cur_fd) { cur_epoch = (cur_epoch + 1) % log_epochs; close(cur_fd); snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); - if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) { - fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno)); + if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) { + my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno)); exit(1); } + update_cur_epoch_file(cur_epoch); ret_fd = write_fd; } else @@ -752,8 +769,9 @@ void *emit_thread() if (netflow->SeqOffset) *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq); peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd); - ret = write(peers[0].write_fd, emit_packet, size); + ret = write(peers[i].write_fd, emit_packet, size); if (ret < size) { + #if ((DEBUG) & DEBUG_E) || defined MESSAGES my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s", i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); @@ -766,7 +784,7 @@ void *emit_thread() emit_count, i + 1, peers[i].seq); } #endif - peers[0].seq += emit_count; + peers[i].seq += emit_count; /* Rate limit */ if (emit_rate_bytes) { @@ -1212,6 +1230,7 @@ int main(int argc, char **argv) } if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg); + if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg); if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg); if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg); if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);