time_difference = timeval_subtract(now, ident->common.last_update);
- if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) {
+ if (time_difference > .01 + (estintms / 1000 * ident->mainloop_intervals)) {
printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n",
estintms * ident->mainloop_intervals, time_difference * 1000);
}
double ideal_weight;
double total_weight = peer_weights + ident->last_localweight;
- if (target >= ident->limit) {
+ if (target >= ident->effective_limit) {
ideal_weight = total_weight;
} else if (target <= 0) {
ideal_weight = 0; // no flows here
} else {
- ideal_weight = ((double)target / (double)ident->limit) * total_weight;
+ ideal_weight = ((double)target / (double)ident->effective_limit) * total_weight;
}
-#if 0
- else if (peer_weights <= 0) {
-#if 0
- // doesn't matter what we pick as our weight, so pick 1 / N.
- ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
-#endif
- ideal_weight = ((double)target / (double)ident->limit) * total_weight;
- } else {
-#if 0
- double divisor = (double) ident->limit - (double) target;
- ideal_weight = ((double) target * peer_weights) / divisor;
-#else
- ideal_weight = ((double)target / (double)ident->limit) * total_weight;
-#endif
- }
-#endif
-
return ideal_weight;
}
static inline uint32_t close_enough(uint32_t limit) {
uint32_t difference = limit - (limit * CLOSE_ENOUGH);
- if (difference < 2500) {
- return (limit - 2500);
+ if (difference < 10240) {
+ return (limit - 10240);
} else {
return (limit * CLOSE_ENOUGH);
}
if (ident->dampen_state == DAMPEN_TEST) {
int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
- double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+ double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
if (rate_delta > threshold) {
ident->dampen_state = DAMPEN_PASSED;
}
Old flowstart code.
#endif
+ //printf("rate is %d, close enough is %d, difference is %d\n", table->rate, close_enough(ident->locallimit), close_enough(ident->locallimit) - table->rate);
/* Boost low-limits so that they have room to grow. */
if (table->rate < FLOW_START_THRESHOLD) {
/* Convert weight value into a rate limit. If there is no measureable
* weight, do a L/n allocation. */
if (total_weight > 0) {
- resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight);
+ resulting_limit = (uint32_t) (ident->localweight * ident->effective_limit / total_weight);
} else {
resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
}
if (ident->dampen_state_copy == DAMPEN_TEST) {
int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
- double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+ double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
if (rate_delta > threshold) {
ident->dampen_state_copy = DAMPEN_PASSED;
/* Convert weight value into a rate limit. If there is no measureable
* weight, do a L/n allocation. */
if (total_weight > 0) {
- resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight);
+ resulting_limit = (uint32_t) (ident->localweight_copy * ident->effective_limit / total_weight);
} else {
resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
}
#endif
/**
- * Determines the amount of FPS weight to allocate to the identity during each
- * estimate interval. Note that total_weight includes local weight.
+ * Determines the local drop probability for a GRD identity every estimate
+ * interval.
*/
-static uint32_t allocate_fps_old(identity_t *ident, double total_weight) {
- common_accounting_t *ftable = &ident->common; /* Common flow table info */
- uint32_t local_rate = ftable->rate;
- uint32_t ideallocal = 0;
- double peer_weights; /* sum of weights of all other limiters */
- double idealweight = 0;
- double last_portion = 0;
- double this_portion = 0;
-
- static int dampen = 0;
- int dampen_increase = 0;
-
- double ideal_under = 0;
- double ideal_over = 0;
-
- int regime = 0;
-
- /* two cases:
- 1. the aggregate is < limit
- 2. the aggregate is >= limit
- */
- peer_weights = total_weight - ident->last_localweight;
- if (peer_weights < 0) {
- peer_weights = 0;
- }
-
- if (dampen == 1) {
- int64_t rate_delta =
- (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
- double threshold =
- (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
-
- if (rate_delta > threshold) {
- dampen_increase = 1;
- printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
- rate_delta, threshold);
- }
- }
+static double allocate_grd(identity_t *ident, double aggdemand) {
+ double dropprob;
+ double min_dropprob = ident->drop_prob * GRD_BIG_DROP;
- if (local_rate <= 0) {
- idealweight = 0;
- } else if (dampen_increase == 0 &&
- (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) {
- /* We're under the limit - all flows are bottlenecked. */
- idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
- ideal_over = allocate_fps_over_limit(ident);
- ideal_under = idealweight;
+ struct timeval tv;
+ double time_now;
+ common_accounting_t *table = &ident->common;
- if (ideal_over < idealweight) {
- idealweight = ideal_over;
- regime = 3;
- dampen = 2;
- } else {
- regime = 1;
- dampen = 0;
- }
+ gettimeofday(&tv, NULL);
+ time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
- /* Apply EWMA */
- ident->localweight = (ident->localweight * ident->ewma_weight +
- idealweight * (1 - ident->ewma_weight));
-
+ if (aggdemand > ident->effective_limit) {
+ dropprob = (aggdemand - ident->effective_limit) / aggdemand;
} else {
- idealweight = allocate_fps_over_limit(ident);
-
- /* Apply EWMA */
- ident->localweight = (ident->localweight * ident->ewma_weight +
- idealweight * (1 - ident->ewma_weight));
-
- /* This is the portion of the total weight in the system that was caused
- * by this limiter in the last interval. */
- last_portion = ident->last_localweight / total_weight;
-
- /* This is the fraction of the total weight in the system that our
- * proposed value for idealweight would use. */
- this_portion = ident->localweight / (peer_weights + ident->localweight);
-
- /* Dampen the large increase the first time... */
- if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
- ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
- dampen = 1;
- } else {
- dampen = 2;
- }
-
- ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
- ideal_over = idealweight;
-
- regime = 2;
+ dropprob = 0.0;
}
- /* Convert weight into a rate - add in our new local weight */
- ident->total_weight = total_weight = ident->localweight + peer_weights;
-
- /* compute local allocation:
- if there is traffic elsewhere, use the weights
- otherwise do a L/n allocation */
- if (total_weight > 0) {
- //if (peer_weights > 0) {
- ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
- } else {
- ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
+ if (dropprob > 0.01 && dropprob < min_dropprob) {
+ dropprob = min_dropprob;
}
- printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
-
- printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f Under / Over / Actual / Rate\n",
- ideal_under / (ideal_under + peer_weights),
- ideal_over / (ideal_over + peer_weights),
- ident->localweight / (ident->localweight + peer_weights),
- (double) local_rate / (double) ident->limit);
-
- printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
-
if (system_loglevel == LOG_DEBUG) {
- printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
- local_rate, idealweight, ident->localweight, total_weight);
- }
-
-#if 0
- if (printcounter <= 0) {
- struct timeval tv;
- double time_now;
-
- gettimeofday(&tv, NULL);
- time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
-
- printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d ", time_now, ftable->inst_rate,
- idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k,
- ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate,
- ftable->max_flow_rate, ftable->max_flow_rate_flow_hash);
-
- printcounter = PRINT_COUNTER_RESET;
- } else {
- printcounter -= 1;
- }
-
- //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
- // dampen, dampen_increase, peer_weights, regime);
-
- if (regime == 3) {
- printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
- ideal_over, ideal_under);
+ printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
+ ident->common.rate, aggdemand, dropprob);
}
- See print_statistics()
-#endif
-
- printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
-
- return(ideallocal);
-}
-
-/**
- * Determines the local drop probability for a GRD identity every estimate
- * interval.
- */
-static double allocate_grd(identity_t *ident, double aggdemand) {
- double dropprob;
- double global_limit = (double) (ident->limit);
- if (aggdemand > global_limit) {
- dropprob = (aggdemand-global_limit)/aggdemand;
+ if (table->max_flow_rate > 0) {
+ printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d %.3f\n",
+ time_now, table->inst_rate, aggdemand,
+ table->num_flows, table->num_flows_5k, table->num_flows_10k,
+ table->num_flows_20k, table->num_flows_50k, table->avg_rate,
+ table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob,
+ ident->id, (double) table->rate / (double) table->max_flow_rate);
} else {
- dropprob = 0.0;
- }
-
- if (system_loglevel == LOG_DEBUG) {
- printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
- ident->common.rate, aggdemand, dropprob);
+ printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d 0\n",
+ time_now, table->inst_rate, aggdemand,
+ table->num_flows, table->num_flows_5k, table->num_flows_10k,
+ table->num_flows_20k, table->num_flows_50k, table->avg_rate,
+ table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob,
+ ident->id);
}
return dropprob;
*/
static void allocate(limiter_t *limiter, identity_t *ident) {
/* Represents aggregate rate for GRD and aggregate weight for FPS. */
- double comm_val = 0;
-
- /* Read comm_val from comm layer. */
- if (limiter->policy == POLICY_FPS) {
- read_comm(&ident->comm, &comm_val,
- ident->total_weight / (double) (ident->comm.remote_node_count + 1));
- } else {
- read_comm(&ident->comm, &comm_val,
- (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
- }
- printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
+ double aggregate = 0;
+ /* Read aggregate from comm layer. */
+ read_comm(&aggregate, &ident->effective_limit, &ident->comm, ident->limit);
+ printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", aggregate);
+
/* Experimental printing. */
printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
(double) ident->common.rate / (double) 128, ident->id);
if (limiter->policy == POLICY_FPS) {
#ifdef SHADOW_ACCTING
- allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID");
+ allocate_fps_pretend(ident, aggregate, &ident->shadow_common, "SHADOW-ID");
ident->last_localweight_copy = ident->localweight_copy;
#endif
- ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID");
+ ident->locallimit = allocate_fps(ident, aggregate, &ident->common, "ID");
ident->last_localweight = ident->localweight;
/* Update other limiters with our weight by writing to comm layer. */
write_local_value(&ident->comm, ident->localweight);
} else {
- ident->locallimit = 0; /* Unused with GRD. */
ident->last_drop_prob = ident->drop_prob;
- ident->drop_prob = allocate_grd(ident, comm_val);
+ ident->drop_prob = allocate_grd(ident, aggregate);
/* Update other limiters with our rate by writing to comm layer. */
write_local_value(&ident->comm, ident->common.rate);
}
/* Make the call to tc. */
-#ifdef DELAY40MS
- snprintf(cmd, CMD_BUFFER_SIZE,
- "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
- ident->leaves[i]->xid, ident->leaves[i]->xid,
- (100 * ident->leaves[i]->drop_prob));
-#else
snprintf(cmd, CMD_BUFFER_SIZE,
- "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
+ "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay %dms",
ident->leaves[i]->xid, ident->leaves[i]->xid,
- (100 * ident->leaves[i]->drop_prob));
-#endif
+ (100 * ident->leaves[i]->drop_prob), ident->leaves[i]->delay);
+
if (do_enforcement) {
ret = system(cmd);