X-Git-Url: http://git.onelab.eu/?p=fprobe-ulog.git;a=blobdiff_plain;f=src%2Ffprobe-ulog.c;h=bca3b5d3253759d39e55b033d86cbb4893e04fb3;hp=0c573ec937f657050c182810599700e7e5dd3262;hb=466dcb566b08ec31ac3eb9946ec986b9e9f8c9c1;hpb=18e628f7da9b37f43b304a2ee3c05a801c252951 diff --git a/src/fprobe-ulog.c b/src/fprobe-ulog.c index 0c573ec..bca3b5d 100644 --- a/src/fprobe-ulog.c +++ b/src/fprobe-ulog.c @@ -8,7 +8,7 @@ Sapan Bhatia - 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file + 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file rotation. 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility. @@ -37,6 +37,8 @@ #include #include +#include "vserver.h" + struct ipulog_handle { int fd; u_int8_t blocking; @@ -95,7 +97,9 @@ struct ipulog_handle { #include #include -#define PIDFILE "/var/log/fprobe-ulog.pid" +#define PIDFILE "/var/log/fprobe-ulog.pid" +#define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch" +#define MAX_EPOCH_SIZE sizeof("32767") #define STD_NETFLOW_PDU enum { @@ -162,10 +166,10 @@ extern struct NetFlow NetFlow1; extern struct NetFlow NetFlow5; extern struct NetFlow NetFlow7; -#define START_VALUE -5 +#define START_DATA_FD -5 #define mark_is_tos parms[Mflag].count static unsigned scan_interval = 5; -static int min_free = 0; +static unsigned int min_free = 0; static int frag_lifetime = 30; static int inactive_lifetime = 60; static int active_lifetime = 300; @@ -213,6 +217,7 @@ static int sched_min, sched_max; static int npeers, npeers_rot; static struct peer *peers; static int sigs; +static char cur_output_file[MAX_PATH_LEN]; static struct Flow *flows[1 << HASH_BITS]; static pthread_mutex_t flows_mutex[1 << HASH_BITS]; @@ -371,6 +376,7 @@ inline void copy_flow(struct Flow *src, struct Flow *dst) dst->sip = src->sip; dst->dip = src->dip; dst->tos = src->tos; + dst->slice_id = src->slice_id; dst->proto = src->proto; dst->tcp_flags = src->tcp_flags; dst->id = src->id; @@ -385,13 +391,15 @@ inline void copy_flow(struct Flow *src, struct Flow *dst) dst->flags = src->flags; } -void get_cur_epoch() { +void read_cur_epoch() { int fd; - fd = open("/tmp/fprobe_last_epoch",O_RDONLY); + /* Reset to -1 in case the read fails */ + cur_epoch=-1; + fd = open(LAST_EPOCH_FILE, O_RDONLY); if (fd != -1) { - char snum[7]; + char snum[MAX_EPOCH_SIZE]; ssize_t len; - len = read(fd, snum, sizeof(snum)-1); + len = read(fd, snum, MAX_EPOCH_SIZE-1); if (len != -1) { snum[len]='\0'; sscanf(snum,"%d",&cur_epoch); @@ -403,32 +411,40 @@ void get_cur_epoch() { } +/* Dumps the current epoch in a file to cope with + * reboots and killings of fprobe */ + void update_cur_epoch_file(int n) { int fd, len; - char snum[7]; - len=snprintf(snum,6,"%d",n); - fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC); + char snum[MAX_EPOCH_SIZE]; + len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n); + fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC); if (fd == -1) { - my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0."); + my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE); return; } write(fd, snum, len); close(fd); } -unsigned get_log_fd(char *fname, int cur_fd) { +/* Get the file descriptor corresponding to the current file. + * The kludgy implementation is to abstract away the 'current + * file descriptor', which may also be a socket. + */ + +unsigned get_data_file_fd(char *fname, int cur_fd) { struct Time now; unsigned cur_uptime; - /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that - * doesn't solve the problem */ struct statfs statfs; int ret_fd; + + /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that + * doesn't solve the problem */ gettime(&now); cur_uptime = getuptime_minutes(&now); - - if (cur_fd!=START_VALUE) { + if (cur_fd != START_DATA_FD) { if (fstatfs(cur_fd, &statfs) == -1) { my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks"); } @@ -447,20 +463,29 @@ unsigned get_log_fd(char *fname, int cur_fd) { } } - /* Epoch length in minutes */ + /* If epoch length has been exceeded, + * or we're starting up + * or we're going back to the first epoch */ if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) { - char nextname[MAX_PATH_LEN]; int write_fd; prev_uptime = cur_uptime; cur_epoch = (cur_epoch + 1) % log_epochs; if (cur_epoch>last_peak) last_peak = cur_epoch; - if (cur_fd>0) + if (cur_fd>0) { close(cur_fd); - snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); - if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) { - my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno)); + /* Compress the finished file */ + char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; + snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file); + system(gzip_cmd); + } + snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); + if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) { + my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno)); exit(1); } + if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) { + my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno)); + } update_cur_epoch_file(cur_epoch); ret_fd = write_fd; } @@ -563,6 +588,15 @@ done: flown->tcp_flags |= flow->tcp_flags; flown->size += flow->size; flown->pkts += flow->pkts; + + /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow + * if a better value comes along. A good example of this is that by the time CoDemux sets the + * peercred of a flow, it has already been accounted for here and attributed to root. */ + + if (flown->slice_id<1) + flown->slice_id = flow->slice_id; + + if (flow->flags & FLOW_FRAG) { /* Fragmented flow require some additional work */ if (flow->flags & FLOW_TL) { @@ -609,6 +643,8 @@ done: return ret; } +int onlyonce=0; + void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) { int i; @@ -625,7 +661,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) case NETFLOW_IPV4_DST_ADDR: ((struct in_addr *) p)->s_addr = flow->dip.s_addr; - if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) { + if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) { my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts); } p += NETFLOW_IPV4_DST_ADDR_SIZE; @@ -724,12 +760,16 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) case NETFLOW_FLAGS7_1: case NETFLOW_SRC_MASK: case NETFLOW_DST_MASK: + if (onlyonce) { + my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n"); + onlyonce=1; + } *((uint8_t *) p) = 0; p += NETFLOW_PAD8_SIZE; break; - case NETFLOW_XID: - *((uint16_t *) p) = flow->tos; - p += NETFLOW_XID_SIZE; + case NETFLOW_SLICE_ID: + *((uint32_t *) p) = flow->slice_id; + p += NETFLOW_SLICE_ID_SIZE; break; case NETFLOW_PAD16: /* Unsupported (uint16_t) */ @@ -784,6 +824,8 @@ void *emit_thread() timeout.tv_nsec = 0; setuser(); + + //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT); for (;;) { pthread_mutex_lock(&emit_mutex); @@ -792,6 +834,7 @@ void *emit_thread() timeout.tv_sec = now.tv_sec + emit_timeout; /* Do not wait until emit_packet will filled - it may be too long */ if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) { + my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec); pthread_mutex_unlock(&emit_mutex); goto sendit; } @@ -830,7 +873,7 @@ sendit: if (peers[i].type == PEER_FILE) { if (netflow->SeqOffset) *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq); - peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd); + peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd); ret = write(peers[i].write_fd, emit_packet, size); if (ret < size) { @@ -841,7 +884,7 @@ sendit: #undef MESSAGES } #if ((DEBUG) & DEBUG_E) - commaneelse { + else { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } @@ -876,7 +919,7 @@ sendreal: #endif } #if ((DEBUG) & DEBUG_E) - commaneelse { + else { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } @@ -1055,6 +1098,7 @@ void *cap_thread() char buf[64]; char logbuf[256]; #endif + int challenge; setuser(); @@ -1116,8 +1160,22 @@ void *cap_thread() flow->sip = nl->ip_src; flow->dip = nl->ip_dst; flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos; - if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) { - my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos); + + /* It's going to be expensive calling this syscall on every flow. + * We should keep a local hash table, for now just bear the overhead... - Sapan*/ + + flow->slice_id=0; + + if (ulog_msg->mark > 0) { + flow->slice_id = xid_to_slice_id(ulog_msg->mark); + } + + if (flow->slice_id < 1) + flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid + + + if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) { + my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id); } flow->iif = snmp_index(ulog_msg->indev_name); flow->oif = snmp_index(ulog_msg->outdev_name); @@ -1517,11 +1575,11 @@ bad_collector: if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc; strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN); - peers[npeers].write_fd = START_VALUE; + peers[npeers].write_fd = START_DATA_FD; peers[npeers].type = PEER_FILE; peers[npeers].seq = 0; - get_cur_epoch(); + read_cur_epoch(); npeers++; } else @@ -1576,6 +1634,7 @@ bad_collector: /* Initialization */ + init_slice_id_hash(); hash_init(); /* Actually for crc16 only */ mem_init(sizeof(struct Flow), bulk_quantity, memory_limit); for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);