X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Festimate.c;h=24adbbf8d81d5b7743983a57be94782d8d26557f;hp=21edcbf0a5ae8de9d6c2570bc0c8ea40112fd37f;hb=c9d6255f0c06ee41eb2c06a5f74a957ec7be3223;hpb=f83340496f632165030cc92cd98408a87082f6b1 diff --git a/drl/estimate.c b/drl/estimate.c index 21edcbf..24adbbf 100644 --- a/drl/estimate.c +++ b/drl/estimate.c @@ -38,7 +38,7 @@ static void estimate(identity_t *ident, const double estintms) { time_difference = timeval_subtract(now, ident->common.last_update); - if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) { + if (time_difference > .01 + (estintms / 1000 * ident->mainloop_intervals)) { printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n", estintms * ident->mainloop_intervals, time_difference * 1000); } @@ -63,31 +63,14 @@ static double allocate_fps_under_limit(identity_t *ident, uint32_t target, doubl double ideal_weight; double total_weight = peer_weights + ident->last_localweight; - if (target >= ident->limit) { + if (target >= ident->effective_limit) { ideal_weight = total_weight; } else if (target <= 0) { ideal_weight = 0; // no flows here } else { - ideal_weight = ((double)target / (double)ident->limit) * total_weight; + ideal_weight = ((double)target / (double)ident->effective_limit) * total_weight; } -#if 0 - else if (peer_weights <= 0) { -#if 0 - // doesn't matter what we pick as our weight, so pick 1 / N. - ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1); -#endif - ideal_weight = ((double)target / (double)ident->limit) * total_weight; - } else { -#if 0 - double divisor = (double) ident->limit - (double) target; - ideal_weight = ((double) target * peer_weights) / divisor; -#else - ideal_weight = ((double)target / (double)ident->limit) * total_weight; -#endif - } -#endif - return ideal_weight; } @@ -125,8 +108,8 @@ static double allocate_fps_over_limit(identity_t *ident) { static inline uint32_t close_enough(uint32_t limit) { uint32_t difference = limit - (limit * CLOSE_ENOUGH); - if (difference < 2500) { - return (limit - 2500); + if (difference < 10240) { + return (limit - 10240); } else { return (limit * CLOSE_ENOUGH); } @@ -180,7 +163,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight, if (ident->dampen_state == DAMPEN_TEST) { int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; - double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; + double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10; if (rate_delta > threshold) { ident->dampen_state = DAMPEN_PASSED; @@ -219,6 +202,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight, } Old flowstart code. #endif + //printf("rate is %d, close enough is %d, difference is %d\n", table->rate, close_enough(ident->locallimit), close_enough(ident->locallimit) - table->rate); /* Boost low-limits so that they have room to grow. */ if (table->rate < FLOW_START_THRESHOLD) { @@ -281,7 +265,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight, /* Convert weight value into a rate limit. If there is no measureable * weight, do a L/n allocation. */ if (total_weight > 0) { - resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight); + resulting_limit = (uint32_t) (ident->localweight * ident->effective_limit / total_weight); } else { resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); } @@ -312,7 +296,7 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight, if (ident->dampen_state_copy == DAMPEN_TEST) { int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; - double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; + double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10; if (rate_delta > threshold) { ident->dampen_state_copy = DAMPEN_PASSED; @@ -392,7 +376,7 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight, /* Convert weight value into a rate limit. If there is no measureable * weight, do a L/n allocation. */ if (total_weight > 0) { - resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight); + resulting_limit = (uint32_t) (ident->localweight_copy * ident->effective_limit / total_weight); } else { resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); } @@ -404,177 +388,49 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight, #endif /** - * Determines the amount of FPS weight to allocate to the identity during each - * estimate interval. Note that total_weight includes local weight. + * Determines the local drop probability for a GRD identity every estimate + * interval. */ -static uint32_t allocate_fps_old(identity_t *ident, double total_weight) { - common_accounting_t *ftable = &ident->common; /* Common flow table info */ - uint32_t local_rate = ftable->rate; - uint32_t ideallocal = 0; - double peer_weights; /* sum of weights of all other limiters */ - double idealweight = 0; - double last_portion = 0; - double this_portion = 0; - - static int dampen = 0; - int dampen_increase = 0; - - double ideal_under = 0; - double ideal_over = 0; - - int regime = 0; - - /* two cases: - 1. the aggregate is < limit - 2. the aggregate is >= limit - */ - peer_weights = total_weight - ident->last_localweight; - if (peer_weights < 0) { - peer_weights = 0; - } - - if (dampen == 1) { - int64_t rate_delta = - (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate; - double threshold = - (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; - - if (rate_delta > threshold) { - dampen_increase = 1; - printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n", - rate_delta, threshold); - } - } +static double allocate_grd(identity_t *ident, double aggdemand) { + double dropprob; + double min_dropprob = ident->drop_prob * GRD_BIG_DROP; - if (local_rate <= 0) { - idealweight = 0; - } else if (dampen_increase == 0 && - (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) { - /* We're under the limit - all flows are bottlenecked. */ - idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights); - ideal_over = allocate_fps_over_limit(ident); - ideal_under = idealweight; + struct timeval tv; + double time_now; + common_accounting_t *table = &ident->common; - if (ideal_over < idealweight) { - idealweight = ideal_over; - regime = 3; - dampen = 2; - } else { - regime = 1; - dampen = 0; - } + gettimeofday(&tv, NULL); + time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); - /* Apply EWMA */ - ident->localweight = (ident->localweight * ident->ewma_weight + - idealweight * (1 - ident->ewma_weight)); - + if (aggdemand > ident->effective_limit) { + dropprob = (aggdemand - ident->effective_limit) / aggdemand; } else { - idealweight = allocate_fps_over_limit(ident); - - /* Apply EWMA */ - ident->localweight = (ident->localweight * ident->ewma_weight + - idealweight * (1 - ident->ewma_weight)); - - /* This is the portion of the total weight in the system that was caused - * by this limiter in the last interval. */ - last_portion = ident->last_localweight / total_weight; - - /* This is the fraction of the total weight in the system that our - * proposed value for idealweight would use. */ - this_portion = ident->localweight / (peer_weights + ident->localweight); - - /* Dampen the large increase the first time... */ - if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) { - ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight); - dampen = 1; - } else { - dampen = 2; - } - - ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights); - ideal_over = idealweight; - - regime = 2; + dropprob = 0.0; } - /* Convert weight into a rate - add in our new local weight */ - ident->total_weight = total_weight = ident->localweight + peer_weights; - - /* compute local allocation: - if there is traffic elsewhere, use the weights - otherwise do a L/n allocation */ - if (total_weight > 0) { - //if (peer_weights > 0) { - ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight); - } else { - ideallocal = ident->limit / (ident->comm.remote_node_count + 1); + if (dropprob > 0.01 && dropprob < min_dropprob) { + dropprob = min_dropprob; } - printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight); - - printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f Under / Over / Actual / Rate\n", - ideal_under / (ideal_under + peer_weights), - ideal_over / (ideal_over + peer_weights), - ident->localweight / (ident->localweight + peer_weights), - (double) local_rate / (double) ident->limit); - - printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over); - if (system_loglevel == LOG_DEBUG) { - printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n", - local_rate, idealweight, ident->localweight, total_weight); - } - -#if 0 - if (printcounter <= 0) { - struct timeval tv; - double time_now; - - gettimeofday(&tv, NULL); - time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); - - printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d ", time_now, ftable->inst_rate, - idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k, - ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate, - ftable->max_flow_rate, ftable->max_flow_rate_flow_hash); - - printcounter = PRINT_COUNTER_RESET; - } else { - printcounter -= 1; - } - - //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n", - // dampen, dampen_increase, peer_weights, regime); - - if (regime == 3) { - printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n", - ideal_over, ideal_under); + printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n", + ident->common.rate, aggdemand, dropprob); } - See print_statistics() -#endif - - printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal); - - return(ideallocal); -} - -/** - * Determines the local drop probability for a GRD identity every estimate - * interval. - */ -static double allocate_grd(identity_t *ident, double aggdemand) { - double dropprob; - double global_limit = (double) (ident->limit); - if (aggdemand > global_limit) { - dropprob = (aggdemand-global_limit)/aggdemand; + if (table->max_flow_rate > 0) { + printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d %.3f\n", + time_now, table->inst_rate, aggdemand, + table->num_flows, table->num_flows_5k, table->num_flows_10k, + table->num_flows_20k, table->num_flows_50k, table->avg_rate, + table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob, + ident->id, (double) table->rate / (double) table->max_flow_rate); } else { - dropprob = 0.0; - } - - if (system_loglevel == LOG_DEBUG) { - printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n", - ident->common.rate, aggdemand, dropprob); + printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d 0\n", + time_now, table->inst_rate, aggdemand, + table->num_flows, table->num_flows_5k, table->num_flows_10k, + table->num_flows_20k, table->num_flows_50k, table->avg_rate, + table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob, + ident->id); } return dropprob; @@ -586,18 +442,12 @@ static double allocate_grd(identity_t *ident, double aggdemand) { */ static void allocate(limiter_t *limiter, identity_t *ident) { /* Represents aggregate rate for GRD and aggregate weight for FPS. */ - double comm_val = 0; - - /* Read comm_val from comm layer. */ - if (limiter->policy == POLICY_FPS) { - read_comm(&ident->comm, &comm_val, - ident->total_weight / (double) (ident->comm.remote_node_count + 1)); - } else { - read_comm(&ident->comm, &comm_val, - (double) (ident->limit / (double) (ident->comm.remote_node_count + 1))); - } - printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val); + double aggregate = 0; + /* Read aggregate from comm layer. */ + read_comm(&aggregate, &ident->effective_limit, &ident->comm, ident->limit); + printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", aggregate); + /* Experimental printing. */ printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n", (double) ident->common.rate / (double) 128, ident->id); @@ -606,20 +456,19 @@ static void allocate(limiter_t *limiter, identity_t *ident) { if (limiter->policy == POLICY_FPS) { #ifdef SHADOW_ACCTING - allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID"); + allocate_fps_pretend(ident, aggregate, &ident->shadow_common, "SHADOW-ID"); ident->last_localweight_copy = ident->localweight_copy; #endif - ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID"); + ident->locallimit = allocate_fps(ident, aggregate, &ident->common, "ID"); ident->last_localweight = ident->localweight; /* Update other limiters with our weight by writing to comm layer. */ write_local_value(&ident->comm, ident->localweight); } else { - ident->locallimit = 0; /* Unused with GRD. */ ident->last_drop_prob = ident->drop_prob; - ident->drop_prob = allocate_grd(ident, comm_val); + ident->drop_prob = allocate_grd(ident, aggregate); /* Update other limiters with our rate by writing to comm layer. */ write_local_value(&ident->comm, ident->common.rate); @@ -739,17 +588,11 @@ static void enforce(limiter_t *limiter, identity_t *ident) { } /* Make the call to tc. */ -#ifdef DELAY40MS - snprintf(cmd, CMD_BUFFER_SIZE, - "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms", - ident->leaves[i]->xid, ident->leaves[i]->xid, - (100 * ident->leaves[i]->drop_prob)); -#else snprintf(cmd, CMD_BUFFER_SIZE, - "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms", + "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay %dms", ident->leaves[i]->xid, ident->leaves[i]->xid, - (100 * ident->leaves[i]->drop_prob)); -#endif + (100 * ident->leaves[i]->drop_prob), ident->leaves[i]->delay); + if (do_enforcement) { ret = system(cmd);