/* See the DRL-LICENSE file for this file's software license. */ #include #include #include #include #include #include #include "common_accounting.h" #include "samplehold.h" static int match(const key_flow *key, const sampled_flow *flow) { if (flow->state != FLOW_USED) return 0; if (key->source_ip != flow->source_ip) return 0; if (key->dest_ip != flow->dest_ip) return 0; if (key->source_port != flow->source_port) return 0; if (key->dest_port != flow->dest_port) return 0; if (key->protocol != flow->protocol) return 0; return 1; } static void get_key(key_flow *key, sampled_flow *flow) { key->source_ip = flow->source_ip; key->dest_ip = flow->dest_ip; key->source_port = flow->source_port; key->dest_port = flow->dest_port; key->protocol = flow->protocol; key->packet_size = 0; } static void move_flow(sampled_flow *dest, sampled_flow *src) { memmove(dest, src, sizeof(sampled_flow)); memset(src, 0, sizeof(sampled_flow)); } uint32_t sampled_table_size(const sampled_flow_table table) { return table->size; } /* * Notes to myself... * * max_bytes is the maximum number of bytes that can pass though DURING THE * MEASUREMENT INTERVAL. So, if you can have 100 Mbit/s and your measurement * interval is 1/10 of a second, your max_bytes is 10Mbit because that's all * you can transfer in 1/10 of a second. * * flow_percentage is the percentage of max_bytes that is considered an * interesting flow. * * oversampling factor is a knob that tunes how accurate our results are at * the cost of additional state/memory. */ sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow *key), const uint32_t max_bytes, const uint32_t flow_percentage, const uint32_t oversampling_factor, common_accounting_t *common) { sampled_flow_table table = malloc(sizeof(struct sampled_flow_table)); double base_size = (double) 100 / (double) flow_percentage; if (table == NULL) { return NULL; } table->capacity = (uint32_t) ((base_size * oversampling_factor) * 1.03); table->size = 0; table->hash_function = hash_function; table->sample_prob = (double) (((double) table->capacity / (double) max_bytes) * (double) RANDOM_GRANULARITY); table->threshold = (double) ((double) flow_percentage / 100) * max_bytes; table->largest = NULL; table->backing = malloc(sizeof(sampled_flow) * table->capacity); if (table->backing == NULL) { free(table); return NULL; } memset(table->backing, 0, sizeof(sampled_flow) * table->capacity); srand(time(NULL)); table->common = common; gettimeofday(&table->common->last_update, NULL); return table; } void sampled_table_destroy(sampled_flow_table table) { free(table->backing); free(table); } sampled_flow *sampled_table_lookup(sampled_flow_table table, const key_flow *key) { uint32_t hash = table->hash_function(key) % table->capacity; uint32_t location = hash; do { if (table->backing[location].state == FLOW_FREE) { /* It ain't here... */ return NULL; } if (match(key, &table->backing[location])) { /* Got it! */ return &table->backing[location]; } location++; if (location == table->capacity) { location = 0; } } while (location != hash); return NULL; } int sampled_table_sample(sampled_flow_table table, const key_flow *key) { sampled_flow *lookup = sampled_table_lookup(table, key); int random_number; double packet_prob; /* First we update the common accouting information so that we have accurate * aggregate information. */ table->common->bytes_since += key->packet_size; /* Below here we're dealing with individual flows. */ /* It's already in the table, update it. */ if (lookup != NULL) { lookup->bytes += key->packet_size; return 1; } /* It's not in the table, probabilistically sample it. */ packet_prob = table->sample_prob * (double) key->packet_size; random_number = rand() % RANDOM_GRANULARITY; if (random_number < packet_prob) { /* It's being sampled - add it to the table. */ uint32_t hash = table->hash_function(key) % table->capacity; uint32_t location = hash; do { if (table->backing[location].state == FLOW_FREE || table->backing[location].state == FLOW_DELETED) { lookup = &table->backing[location]; break; } location++; if (location == table->capacity) { location = 0; } } while (location != hash); if (lookup == NULL) { /* Table is full!?! */ printf("Full table!\n"); return 0; } table->size += 1; lookup->bytes = key->packet_size; lookup->source_ip = key->source_ip; lookup->dest_ip = key->dest_ip; lookup->source_port = key->source_port; lookup->dest_port = key->dest_port; lookup->protocol = key->protocol; lookup->state = FLOW_USED; lookup->last_bytes = 0; lookup->rate = 0; gettimeofday(&lookup->last_update, NULL); return 1; } /* Not sampled. */ return 0; } int sampled_table_cleanup(sampled_flow_table table) { /* This should... * 1) Remove "small" flows from the table. * 2) Compact the table so that the remaining flows are closer to their * hash locations. * 3) Reset the state of deleted flows to free. */ /* How it might work... * 1) Scan through the backing array. * 2) If the flow is small, memset it to 0. * It it's large, add it to a linked list. * 3) For all items in the linked list, hash them and put them in the * correct location. */ /* For now though, we're going to do it the inefficient way and loop * through the backing twice. */ int i; /* Clear small items. */ for (i = 0; i < table->capacity; ++i) { if (table->backing[i].state == FLOW_USED && table->backing[i].bytes > table->threshold) { /* It gets to stick around. */ } else { /* It dies... */ memset(&table->backing[i], 0, sizeof(sampled_flow)); } } /* Compact the table and put things closer to their hash locations. */ for (i = 0; i < table->capacity; ++i) { if (table->backing[i].state == FLOW_USED) { uint32_t hash; key_flow key; get_key(&key, &table->backing[i]); hash = table->hash_function(&key) % table->capacity; if (i == hash) { /* Already in the best place */ table->backing[i].bytes = 0; table->backing[i].last_bytes = 0; table->backing[i].rate = 0; } else { uint32_t location = hash; do { if (table->backing[location].state == FLOW_FREE) { move_flow(&table->backing[location], &table->backing[i]); table->backing[location].bytes = 0; table->backing[location].last_bytes = 0; table->backing[location].rate = 0; break; } location++; if (location == table->capacity) { location = 0; } } while (location != hash); } } } table->largest = NULL; return 0; } void sampled_table_update_flows(sampled_flow_table table, struct timeval now, double ewma_weight) { int i = 0; uint32_t largest_rate = 0; uint32_t rate_delta = 0; double time_delta = 0; double unweighted_rate = 0; /* Update common aggregate information. */ time_delta = timeval_subtract(now, table->common->last_update); if (time_delta <= 0) { unweighted_rate = 0; } else { unweighted_rate = table->common->bytes_since / time_delta; } table->common->last_inst_rate = table->common->inst_rate; table->common->inst_rate = unweighted_rate; table->common->last_rate = table->common->rate; /* If the rate is zero, then we don't know anything yet. Don't apply EWMA * in that case. */ if (table->common->rate == 0) { table->common->rate = unweighted_rate; } else { table->common->rate = table->common->rate * ewma_weight + unweighted_rate * (1 - ewma_weight); } table->common->bytes_since = 0; table->common->last_update = now; /* Update per-flow information. */ table->largest = &table->backing[i]; largest_rate = table->backing[i].rate; for (i = 0; i < table->capacity; ++i) { if (table->backing[i].state == FLOW_USED) { rate_delta = table->backing[i].bytes - table->backing[i].last_bytes; time_delta = timeval_subtract(now, table->backing[i].last_update); /* Calculate the unweighted rate. Be careful not to divide by * something silly. */ if (time_delta <= 0) { unweighted_rate = 0; } else { unweighted_rate = rate_delta / time_delta; } if (table->backing[i].rate == 0) { table->backing[i].rate = unweighted_rate; } else { table->backing[i].rate = (table->backing[i].rate * ewma_weight + unweighted_rate * (1 - ewma_weight)); } table->backing[i].last_bytes = table->backing[i].bytes; table->backing[i].last_update = now; if (table->backing[i].rate > largest_rate) { largest_rate = table->backing[i].rate; table->largest = &table->backing[i]; } } } table->common->max_flow_rate = largest_rate; } sampled_flow *sampled_table_largest(sampled_flow_table table) { return table->largest; }