X-Git-Url: http://git.onelab.eu/?p=fprobe-ulog.git;a=blobdiff_plain;f=src%2Ffprobe-ulog.c;h=bca3b5d3253759d39e55b033d86cbb4893e04fb3;hp=c744c01c2b261339e54e0b91d16434247a20e8e1;hb=466dcb566b08ec31ac3eb9946ec986b9e9f8c9c1;hpb=48d950c6c672553f11f353b3dd76c7e13dcd16cd diff --git a/src/fprobe-ulog.c b/src/fprobe-ulog.c index c744c01..bca3b5d 100644 --- a/src/fprobe-ulog.c +++ b/src/fprobe-ulog.c @@ -8,7 +8,7 @@ Sapan Bhatia - 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file + 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file rotation. 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility. @@ -217,6 +217,7 @@ static int sched_min, sched_max; static int npeers, npeers_rot; static struct peer *peers; static int sigs; +static char cur_output_file[MAX_PATH_LEN]; static struct Flow *flows[1 << HASH_BITS]; static pthread_mutex_t flows_mutex[1 << HASH_BITS]; @@ -375,7 +376,7 @@ inline void copy_flow(struct Flow *src, struct Flow *dst) dst->sip = src->sip; dst->dip = src->dip; dst->tos = src->tos; - dst->xid = src->xid; + dst->slice_id = src->slice_id; dst->proto = src->proto; dst->tcp_flags = src->tcp_flags; dst->id = src->id; @@ -466,20 +467,24 @@ unsigned get_data_file_fd(char *fname, int cur_fd) { * or we're starting up * or we're going back to the first epoch */ if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) { - char nextname[MAX_PATH_LEN]; int write_fd; prev_uptime = cur_uptime; cur_epoch = (cur_epoch + 1) % log_epochs; if (cur_epoch>last_peak) last_peak = cur_epoch; - if (cur_fd>0) + if (cur_fd>0) { close(cur_fd); - snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); - if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) { - my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno)); + /* Compress the finished file */ + char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; + snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file); + system(gzip_cmd); + } + snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); + if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) { + my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno)); exit(1); } if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) { - my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno)); + my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno)); } update_cur_epoch_file(cur_epoch); ret_fd = write_fd; @@ -584,12 +589,12 @@ done: flown->size += flow->size; flown->pkts += flow->pkts; - /* The xid of the first xid of a flow is misleading. Reset the xid of the flow + /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow * if a better value comes along. A good example of this is that by the time CoDemux sets the * peercred of a flow, it has already been accounted for here and attributed to root. */ - if (flown->xid<1) - flown->xid = flow->xid; + if (flown->slice_id<1) + flown->slice_id = flow->slice_id; if (flow->flags & FLOW_FRAG) { @@ -762,9 +767,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) *((uint8_t *) p) = 0; p += NETFLOW_PAD8_SIZE; break; - case NETFLOW_XID: - *((uint32_t *) p) = flow->xid; - p += NETFLOW_XID_SIZE; + case NETFLOW_SLICE_ID: + *((uint32_t *) p) = flow->slice_id; + p += NETFLOW_SLICE_ID_SIZE; break; case NETFLOW_PAD16: /* Unsupported (uint16_t) */ @@ -819,6 +824,8 @@ void *emit_thread() timeout.tv_nsec = 0; setuser(); + + //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT); for (;;) { pthread_mutex_lock(&emit_mutex); @@ -827,6 +834,7 @@ void *emit_thread() timeout.tv_sec = now.tv_sec + emit_timeout; /* Do not wait until emit_packet will filled - it may be too long */ if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) { + my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec); pthread_mutex_unlock(&emit_mutex); goto sendit; } @@ -876,7 +884,7 @@ sendit: #undef MESSAGES } #if ((DEBUG) & DEBUG_E) - commaneelse { + else { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } @@ -911,7 +919,7 @@ sendreal: #endif } #if ((DEBUG) & DEBUG_E) - commaneelse { + else { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } @@ -1156,19 +1164,18 @@ void *cap_thread() /* It's going to be expensive calling this syscall on every flow. * We should keep a local hash table, for now just bear the overhead... - Sapan*/ - flow->xid=0; + flow->slice_id=0; if (ulog_msg->mark > 0) { - flow->xid = get_vhi_name(ulog_msg->mark); - challenge = get_vhi_name(ulog_msg->mark); + flow->slice_id = xid_to_slice_id(ulog_msg->mark); } - if (flow->xid < 1 || flow->xid!=challenge) - flow->xid = ulog_msg->mark; + if (flow->slice_id < 1) + flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) { - my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid); + my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id); } flow->iif = snmp_index(ulog_msg->indev_name); flow->oif = snmp_index(ulog_msg->outdev_name); @@ -1627,6 +1634,7 @@ bad_collector: /* Initialization */ + init_slice_id_hash(); hash_init(); /* Actually for crc16 only */ mem_init(sizeof(struct Flow), bulk_quantity, memory_limit); for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);