X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Festimate.c;h=21edcbf0a5ae8de9d6c2570bc0c8ea40112fd37f;hp=e42d7a6f83d229630e1e75e90253f7da753471b8;hb=f83340496f632165030cc92cd98408a87082f6b1;hpb=74f52acf84cbf11faab8aa53e069464063ce11b9 diff --git a/drl/estimate.c b/drl/estimate.c index e42d7a6..21edcbf 100644 --- a/drl/estimate.c +++ b/drl/estimate.c @@ -16,12 +16,10 @@ #include "raterouter.h" #include "util.h" #include "ratetypes.h" /* needs util and pthread.h */ +#include "calendar.h" #include "logging.h" -#define PRINT_COUNTER_RESET (0) - extern uint8_t system_loglevel; -static int printcounter = PRINT_COUNTER_RESET - 1; uint8_t do_enforcement = 0; @@ -30,15 +28,30 @@ uint8_t do_enforcement = 0; * to estimate the current aggregate rate and the rate of the individual flows * in the table. */ -static void estimate(identity_t *ident) { +static void estimate(identity_t *ident, const double estintms) { struct timeval now; + double time_difference; + pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */ + gettimeofday(&now, NULL); - pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */ + time_difference = timeval_subtract(now, ident->common.last_update); + + if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) { + printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n", + estintms * ident->mainloop_intervals, time_difference * 1000); + } ident->table_update_function(ident->table, now, ident->ewma_weight); +#ifdef SHADOW_ACCTING + + standard_table_update_flows((standard_flow_table) ident->shadow_table, now, + ident->ewma_weight); + +#endif + pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */ } @@ -46,27 +59,10 @@ static void estimate(identity_t *ident) { * Determines the FPS weight allocation when the identity is under its current * local rate limit. */ -static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) { - uint32_t target = local_rate; +static double allocate_fps_under_limit(identity_t *ident, uint32_t target, double peer_weights) { double ideal_weight; double total_weight = peer_weights + ident->last_localweight; - if (ident->flowstart) { - target = local_rate*4; - if (local_rate >= FLOW_START_THRESHOLD) { - ident->flowstart = false; - } - } - else { - /* June 16, 2008 (KCW) - * ident->flowstart gets set initially to one, but it is never set again. However, - * if a limiter gets flows and then the number of flows drops to zero, it has trouble - * increasing the limit again. */ - if (local_rate < FLOW_START_THRESHOLD) { - ident->flowstart = true; - } - } - if (target >= ident->limit) { ideal_weight = total_weight; } else if (target <= 0) { @@ -101,12 +97,14 @@ static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, d */ static double allocate_fps_over_limit(identity_t *ident) { double ideal_weight; + double total_over_max; if (ident->common.max_flow_rate > 0) { ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate; + total_over_max = (double) ident->common.rate / (double) ident->common.max_flow_rate; - printlog(LOG_DEBUG, "%.3f %d %d %d FlowCount, Limit, MaxRate, TotalRate\n", - ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate); + printlog(LOG_DEBUG, "ideal_over: %.3f, limit: %d, max_flow_rate: %d, total_rate: %d, total/max: %.3f\n", + ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate, total_over_max); } else { ideal_weight = 1; } @@ -114,11 +112,302 @@ static double allocate_fps_over_limit(identity_t *ident) { return ideal_weight; } +/** + * When FPS checks to see which mode it should be operating in + * (over limit vs under limit), we don't want it to actually look to + * see if we're at the limit. Instead, we want to see if we're getting + * close to the limit. This defines how close is "close enough". + * + * For example, if the limit is 50000 and we're sending 49000, we probably + * want to be in the over limit mode, even if we aren't actually over the limit + * in order to switch to the more aggressive weight calculations. + */ +static inline uint32_t close_enough(uint32_t limit) { + uint32_t difference = limit - (limit * CLOSE_ENOUGH); + + if (difference < 2500) { + return (limit - 2500); + } else { + return (limit * CLOSE_ENOUGH); + } +} + +static void print_statistics(identity_t *ident, const double ideal_weight, + const double total_weight, const double localweight, + const char *identifier, common_accounting_t *table, + const uint32_t resulting_limit) { + struct timeval tv; + double time_now; + + gettimeofday(&tv, NULL); + time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); + + printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d %d %s:%d ", + time_now, table->inst_rate, ideal_weight, localweight, total_weight, + table->num_flows, table->num_flows_5k, table->num_flows_10k, + table->num_flows_20k, table->num_flows_50k, table->avg_rate, + table->max_flow_rate, table->max_flow_rate_flow_hash, resulting_limit, + identifier, ident->id); + + if (table->max_flow_rate > 0) { + printlog(LOG_WARN, "%.3f\n", (double) table->rate / (double) table->max_flow_rate); + } else { + printlog(LOG_WARN, "0\n"); + } + + /* Print to the screen in debug mode. */ + if (system_loglevel == LOG_DEBUG) { + printf("Local Rate: %d, Ideal Weight: %.3f, Local Weight: %.3f, Total Weight: %.3f\n", + table->rate, ideal_weight, ident->localweight, total_weight); + } +} + +static uint32_t allocate_fps(identity_t *ident, double total_weight, + common_accounting_t *table, const char *identifier) { + + uint32_t resulting_limit = 0; + double ideal_weight = 0.0; + double peer_weights = total_weight - ident->last_localweight; + + /* Keep track of these for measurements & comparisons only. */ + double ideal_under = 0.0; + double ideal_over = 0.0; + + /* Weight sanity. */ + if (peer_weights < 0.0) { + peer_weights = 0.0; + } + + if (ident->dampen_state == DAMPEN_TEST) { + int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; + double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; + + if (rate_delta > threshold) { + ident->dampen_state = DAMPEN_PASSED; + } else { + ident->dampen_state = DAMPEN_FAILED; + } + } + + /* Rate/weight sanity. */ + if (table->rate <= 0) { + ideal_weight = 0.0; + } + + /* Under the limit OR we failed our dampening test OR our current + * outgoing traffic rate is under the low "flowstart" watermark. */ + else if (ident->dampen_state == DAMPEN_FAILED || + table->rate < close_enough(ident->locallimit)) { +#if 0 + || ident->flowstart) { + uint32_t target_rate = table->rate; + + if (ident->flowstart) { + target_rate *= 4; + + if (table->rate >= FLOW_START_THRESHOLD) { + ident->flowstart = false; + } + } else { + /* June 16, 2008 (KCW) + * ident->flowstart gets set initially to one, but it is never set again. However, + * if a limiter gets flows and then the number of flows drops to zero, it has trouble + * increasing the limit again. */ + if (table->rate < FLOW_START_THRESHOLD) { + ident->flowstart = true; + } + } + Old flowstart code. +#endif + + /* Boost low-limits so that they have room to grow. */ + if (table->rate < FLOW_START_THRESHOLD) { + ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights); + } else { + ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); + } + + ideal_over = allocate_fps_over_limit(ident); + + if (ideal_over < ideal_under) { + /* Degenerate case in which the agressive weight calculation was + * actually less than the under-the-limit case. Use it instead + * and skip the dampening check in the next interval. */ + ideal_weight = ideal_over; + ident->dampen_state = DAMPEN_SKIP; + } else { + ident->dampen_state = DAMPEN_NONE; + } + + /* Apply EWMA. */ + ident->localweight = (ident->localweight * ident->ewma_weight + + ideal_weight * (1 - ident->ewma_weight)); + } + + /* At or over the limit. Use the aggressive weight calculation. */ + else { + double portion_last_interval = 0.0; + double portion_this_interval = 0.0; + + ideal_weight = ideal_over = allocate_fps_over_limit(ident); + ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); + + /* Apply EWMA. */ + ident->localweight = (ident->localweight * ident->ewma_weight + + ideal_weight * (1 - ident->ewma_weight)); + + /* Now check whether the result of the aggressive weight calculation + * increases our portion of the weight "too much", in which case we + * dampen it. */ + + /* Our portion of weight in the whole system during the last interval.*/ + portion_last_interval = ident->last_localweight / total_weight; + + /* Our proposed portion of weight for the current interval. */ + portion_this_interval = ident->localweight / (peer_weights + ident->localweight); + + if (ident->dampen_state == DAMPEN_NONE && + (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) { + ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight); + ident->dampen_state = DAMPEN_TEST; + } else { + ident->dampen_state = DAMPEN_SKIP; + } + } + + /* Add the weight calculated in this interval to the total. */ + ident->total_weight = total_weight = ident->localweight + peer_weights; + + /* Convert weight value into a rate limit. If there is no measureable + * weight, do a L/n allocation. */ + if (total_weight > 0) { + resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight); + } else { + resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); + } + + print_statistics(ident, ideal_weight, total_weight, ident->localweight, + identifier, table, resulting_limit); + + return resulting_limit; +} + +#ifdef SHADOW_ACCTING + +/* Runs through the allocate functionality without making any state changes to + * the identity. Useful for comparisons, especially for comparing standard + * and sample&hold accounting schemes. */ +static void allocate_fps_pretend(identity_t *ident, double total_weight, + common_accounting_t *table, const char *identifier) { + + uint32_t resulting_limit = 0; + double ideal_weight = 0.0; + double peer_weights = total_weight - ident->last_localweight_copy; + double ideal_under = 0.0; + double ideal_over = 0.0; + + if (peer_weights < 0.0) { + peer_weights = 0.0; + } + + if (ident->dampen_state_copy == DAMPEN_TEST) { + int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; + double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; + + if (rate_delta > threshold) { + ident->dampen_state_copy = DAMPEN_PASSED; + } else { + ident->dampen_state_copy = DAMPEN_FAILED; + } + } + + /* Rate/weight sanity. */ + if (table->rate <= 0) { + ideal_weight = 0.0; + } + + /* Under the limit OR we failed our dampening test OR our current + * outgoing traffic rate is under the low "flowstart" watermark. */ + else if (ident->dampen_state_copy == DAMPEN_FAILED || + table->rate < close_enough(ident->locallimit)) { + + /* Boost low-limits so that they have room to grow. */ + if (table->rate < FLOW_START_THRESHOLD) { + ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights); + } else { + ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); + } + + ideal_over = allocate_fps_over_limit(ident); + + if (ideal_over < ideal_under) { + /* Degenerate case in which the agressive weight calculation was + * actually less than the under-the-limit case. Use it instead + * and skip the dampening check in the next interval. */ + ideal_weight = ideal_over; + ident->dampen_state_copy = DAMPEN_SKIP; + } else { + ident->dampen_state_copy = DAMPEN_NONE; + } + + /* Apply EWMA. */ + ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight + + ideal_weight * (1 - ident->ewma_weight)); + } + + /* At or over the limit. Use the aggressive weight calculation. */ + else { + double portion_last_interval = 0.0; + double portion_this_interval = 0.0; + + ideal_weight = ideal_over = allocate_fps_over_limit(ident); + ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); + + /* Apply EWMA. */ + ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight + + ideal_weight * (1 - ident->ewma_weight)); + + /* Now check whether the result of the aggressive weight calculation + * increases our portion of the weight "too much", in which case we + * dampen it. */ + + /* Our portion of weight in the whole system during the last interval.*/ + portion_last_interval = ident->last_localweight / total_weight; + + /* Our proposed portion of weight for the current interval. */ + portion_this_interval = ident->localweight_copy / (peer_weights + ident->localweight_copy); + + if (ident->dampen_state_copy == DAMPEN_NONE && + (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) { + ident->localweight_copy = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight); + ident->dampen_state_copy = DAMPEN_TEST; + } else { + ident->dampen_state_copy = DAMPEN_SKIP; + } + } + + /* Add the weight calculated in this interval to the total. */ + total_weight = ident->localweight_copy + peer_weights; + + /* Convert weight value into a rate limit. If there is no measureable + * weight, do a L/n allocation. */ + if (total_weight > 0) { + resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight); + } else { + resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); + } + + print_statistics(ident, ideal_weight, total_weight, ident->localweight_copy, + identifier, table, resulting_limit); +} + +#endif + /** * Determines the amount of FPS weight to allocate to the identity during each * estimate interval. Note that total_weight includes local weight. */ -static uint32_t allocate_fps(identity_t *ident, double total_weight) { +static uint32_t allocate_fps_old(identity_t *ident, double total_weight) { common_accounting_t *ftable = &ident->common; /* Common flow table info */ uint32_t local_rate = ftable->rate; uint32_t ideallocal = 0; @@ -160,7 +449,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { if (local_rate <= 0) { idealweight = 0; } else if (dampen_increase == 0 && - (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) { + (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) { /* We're under the limit - all flows are bottlenecked. */ idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights); ideal_over = allocate_fps_over_limit(ident); @@ -236,6 +525,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { local_rate, idealweight, ident->localweight, total_weight); } +#if 0 if (printcounter <= 0) { struct timeval tv; double time_now; @@ -260,6 +550,8 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n", ideal_over, ideal_under); } + See print_statistics() +#endif printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal); @@ -312,9 +604,16 @@ static void allocate(limiter_t *limiter, identity_t *ident) { ident->avg_bytes += ident->common.rate; if (limiter->policy == POLICY_FPS) { - ident->locallimit = allocate_fps(ident, comm_val); +#ifdef SHADOW_ACCTING + + allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID"); + + ident->last_localweight_copy = ident->localweight_copy; +#endif + + ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID"); ident->last_localweight = ident->localweight; - + /* Update other limiters with our weight by writing to comm layer. */ write_local_value(&ident->comm, ident->localweight); } else { @@ -385,9 +684,17 @@ static void enforce(limiter_t *limiter, identity_t *ident) { } printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id); +#if 0 if (printcounter == PRINT_COUNTER_RESET) { - printlog(LOG_WARN, "%d\n", ident->locallimit); + if (ident->common.max_flow_rate > 0) { + printlog(LOG_WARN, "%d ID:%d %.3f\n", ident->locallimit, ident->id, + (double) ident->common.rate / (double) ident->common.max_flow_rate); + } else { + printlog(LOG_WARN, "%d ID:%d 0\n", ident->locallimit, ident->id); + } } + This is now done in print_statistics() +#endif snprintf(cmd, CMD_BUFFER_SIZE, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600", @@ -398,7 +705,7 @@ static void enforce(limiter_t *limiter, identity_t *ident) { if (ret) { /* FIXME: call failed. What to do? */ - printlog(LOG_CRITICAL, "***TC call failed?***\n"); + printlog(LOG_CRITICAL, "***TC call failed? Call was: %s***\n", cmd); } } break; @@ -476,6 +783,12 @@ static void clean(drl_instance_t *instance) { ident->table_cleanup_function(ident->table); +#ifdef SHADOW_ACCTING + + standard_table_cleanup((standard_flow_table) ident->shadow_table); + +#endif + pthread_mutex_unlock(&ident->table_mutex); } @@ -505,11 +818,11 @@ static void print_averages(drl_instance_t *instance, int print_interval) { * of identities. * * Each identity also has a private lock for its table. This gets locked by - * table-modifying functions such as estimate and clean. + * table-modifying functions such as estimate and clean. It's also locked in + * ulogd_DRL.c when the table is being updated with new packets. */ void handle_estimation(void *arg) { limiter_t *limiter = (limiter_t *) arg; - identity_t *ident = NULL; int clean_timer, clean_wait_intervals; useconds_t sleep_time = limiter->estintms * 1000; uint32_t cal_slot = 0; @@ -520,6 +833,7 @@ void handle_estimation(void *arg) { sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); + sigaddset(&signal_mask, SIGUSR2); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); /* Determine the number of intervals we should wait before hitting the @@ -528,6 +842,8 @@ void handle_estimation(void *arg) { clean_timer = clean_wait_intervals; while (true) { + printlog(LOG_DEBUG, "--Beginning new tick.--\n"); + /* Sleep according to the delay of the estimate interval. */ usleep(sleep_time); @@ -540,26 +856,53 @@ void handle_estimation(void *arg) { /* Service all the identities that are scheduled to run during this * tick. */ while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) { - ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot); - TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar); + identity_action *iaction = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot); + TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, iaction, calendar); - /* Update the ident's flow accouting table with the latest info. */ - estimate(ident); + /* Only execute the action if it is valid. */ + if (iaction->valid == 0) { + free(iaction); + continue; + } + + switch (iaction->action) { + case ACTION_MAINLOOP: + + printlog(LOG_DEBUG, "Main loop: identity %d\n", iaction->ident->id); + + /* Update the ident's flow accouting table with the latest info. */ + estimate(iaction->ident, limiter->estintms); + + /* Determine its share of the rate allocation. */ + allocate(limiter, iaction->ident); - /* Determine its share of the rate allocation. */ - allocate(limiter, ident); + /* Make tc calls to enforce the rate we decided upon. */ + enforce(limiter, iaction->ident); - /* Make tc calls to enforce the rate we decided upon. */ - enforce(limiter, ident); + /* Add ident back to the queue at a future time slot. */ + TAILQ_INSERT_TAIL(limiter->stable_instance.cal + + ((cal_slot + iaction->ident->mainloop_intervals) & SCHEDMASK), + iaction, calendar); + break; - /* Tell the comm library to propagate this identity's result for - * this interval.*/ - send_update(&ident->comm, ident->id); + case ACTION_COMMUNICATE: - /* Add ident back to the queue at a future time slot. */ - TAILQ_INSERT_TAIL(limiter->stable_instance.cal + - ((cal_slot + ident->intervals) & SCHEDMASK), - ident, calendar); + printlog(LOG_DEBUG, "Communicating: identity %d\n", iaction->ident->id); + + /* Tell the comm library to propagate this identity's result for + * this interval.*/ + send_update(&iaction->ident->comm, iaction->ident->id); + + /* Add ident back to the queue at a future time slot. */ + TAILQ_INSERT_TAIL(limiter->stable_instance.cal + + ((cal_slot + iaction->ident->communication_intervals) & SCHEDMASK), + iaction, calendar); + break; + + default: + printlog(LOG_CRITICAL, "Unknown identity action!?!\n"); + exit(EXIT_FAILURE); + } } print_interval--;