1 /* See the DRL-LICENSE file for this file's software license. */
4 * Thread to periodically calculate the estimated local limits
5 * Barath Raghavan 2006/2007
12 /** The size of the buffer we use to hold tc commands. */
13 #define CMD_BUFFER_SIZE 200
16 #include "raterouter.h"
18 #include "ratetypes.h" /* needs util and pthread.h */
21 #define PRINT_COUNTER_RESET (0)
23 extern uint8_t system_loglevel;
24 static int printcounter = PRINT_COUNTER_RESET - 1;
26 uint8_t do_enforcement = 0;
29 * Called for each identity each estimate interval. Uses flow table information
30 * to estimate the current aggregate rate and the rate of the individual flows
33 static void estimate(identity_t *ident) {
36 gettimeofday(&now, NULL);
38 pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
40 ident->table_update_function(ident->table, now, ident->ewma_weight);
42 pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
46 * Determines the FPS weight allocation when the identity is under its current
49 static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) {
50 uint32_t target = local_rate;
52 double total_weight = peer_weights + ident->last_localweight;
54 if (ident->flowstart) {
55 target = local_rate*4;
56 if (local_rate >= FLOW_START_THRESHOLD) {
57 ident->flowstart = false;
61 /* June 16, 2008 (KCW)
62 * ident->flowstart gets set initially to one, but it is never set again. However,
63 * if a limiter gets flows and then the number of flows drops to zero, it has trouble
64 * increasing the limit again. */
65 if (local_rate < FLOW_START_THRESHOLD) {
66 ident->flowstart = true;
70 if (target >= ident->limit) {
71 ideal_weight = total_weight;
72 } else if (target <= 0) {
73 ideal_weight = 0; // no flows here
75 ideal_weight = ((double)target / (double)ident->limit) * total_weight;
79 else if (peer_weights <= 0) {
81 // doesn't matter what we pick as our weight, so pick 1 / N.
82 ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
84 ideal_weight = ((double)target / (double)ident->limit) * total_weight;
87 double divisor = (double) ident->limit - (double) target;
88 ideal_weight = ((double) target * peer_weights) / divisor;
90 ideal_weight = ((double)target / (double)ident->limit) * total_weight;
99 * Determines the FPS weight allocation when the identity is over its current
102 static double allocate_fps_over_limit(identity_t *ident) {
105 if (ident->common.max_flow_rate > 0) {
106 ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
108 printlog(LOG_DEBUG, "%.3f %d %d %d FlowCount, Limit, MaxRate, TotalRate\n",
109 ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate);
118 * Determines the amount of FPS weight to allocate to the identity during each
119 * estimate interval. Note that total_weight includes local weight.
121 static uint32_t allocate_fps(identity_t *ident, double total_weight) {
122 common_accounting_t *ftable = &ident->common; /* Common flow table info */
123 uint32_t local_rate = ftable->rate;
124 uint32_t ideallocal = 0;
125 double peer_weights; /* sum of weights of all other limiters */
126 double idealweight = 0;
127 double last_portion = 0;
128 double this_portion = 0;
130 static int dampen = 0;
131 int dampen_increase = 0;
133 double ideal_under = 0;
134 double ideal_over = 0;
139 1. the aggregate is < limit
140 2. the aggregate is >= limit
142 peer_weights = total_weight - ident->last_localweight;
143 if (peer_weights < 0) {
149 (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
151 (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
153 if (rate_delta > threshold) {
155 printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
156 rate_delta, threshold);
160 if (local_rate <= 0) {
162 } else if (dampen_increase == 0 &&
163 (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) {
164 /* We're under the limit - all flows are bottlenecked. */
165 idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
166 ideal_over = allocate_fps_over_limit(ident);
167 ideal_under = idealweight;
169 if (ideal_over < idealweight) {
170 idealweight = ideal_over;
179 ident->localweight = (ident->localweight * ident->ewma_weight +
180 idealweight * (1 - ident->ewma_weight));
183 idealweight = allocate_fps_over_limit(ident);
186 ident->localweight = (ident->localweight * ident->ewma_weight +
187 idealweight * (1 - ident->ewma_weight));
189 /* This is the portion of the total weight in the system that was caused
190 * by this limiter in the last interval. */
191 last_portion = ident->last_localweight / total_weight;
193 /* This is the fraction of the total weight in the system that our
194 * proposed value for idealweight would use. */
195 this_portion = ident->localweight / (peer_weights + ident->localweight);
197 /* Dampen the large increase the first time... */
198 if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
199 ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
205 ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
206 ideal_over = idealweight;
211 /* Convert weight into a rate - add in our new local weight */
212 ident->total_weight = total_weight = ident->localweight + peer_weights;
214 /* compute local allocation:
215 if there is traffic elsewhere, use the weights
216 otherwise do a L/n allocation */
217 if (total_weight > 0) {
218 //if (peer_weights > 0) {
219 ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
221 ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
224 printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
226 printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f Under / Over / Actual / Rate\n",
227 ideal_under / (ideal_under + peer_weights),
228 ideal_over / (ideal_over + peer_weights),
229 ident->localweight / (ident->localweight + peer_weights),
230 (double) local_rate / (double) ident->limit);
232 printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
234 if (system_loglevel == LOG_DEBUG) {
235 printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
236 local_rate, idealweight, ident->localweight, total_weight);
239 if (printcounter <= 0) {
243 gettimeofday(&tv, NULL);
244 time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
246 printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d", time_now, ftable->inst_rate,
247 idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k,
248 ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate,
249 ftable->max_flow_rate, ftable->max_flow_rate_flow_hash);
251 printcounter = PRINT_COUNTER_RESET;
256 //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
257 // dampen, dampen_increase, peer_weights, regime);
260 printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
261 ideal_over, ideal_under);
264 printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
270 * Determines the local drop probability for a GRD identity every estimate
273 static double allocate_grd(identity_t *ident, double aggdemand) {
275 double global_limit = (double) (ident->limit);
277 if (aggdemand > global_limit) {
278 dropprob = (aggdemand-global_limit)/aggdemand;
283 if (system_loglevel == LOG_DEBUG) {
284 printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
285 ident->common.rate, aggdemand, dropprob);
292 * Given current estimates of local rate (weight) and remote rates (weights)
293 * use GRD or FPS to calculate a new local limit.
295 static void allocate(limiter_t *limiter, identity_t *ident) {
296 /* Represents aggregate rate for GRD and aggregate weight for FPS. */
299 /* Read comm_val from comm layer. */
300 if (limiter->policy == POLICY_FPS) {
301 read_comm(&ident->comm, &comm_val,
302 ident->total_weight / (double) (ident->comm.remote_node_count + 1));
304 read_comm(&ident->comm, &comm_val,
305 (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
307 printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
309 /* Experimental printing. */
310 printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
311 (double) ident->common.rate / (double) 128, ident->id);
312 ident->avg_bytes += ident->common.rate;
314 if (limiter->policy == POLICY_FPS) {
315 ident->locallimit = allocate_fps(ident, comm_val);
316 ident->last_localweight = ident->localweight;
318 /* Update other limiters with our weight by writing to comm layer. */
319 write_local_value(&ident->comm, ident->localweight);
321 ident->locallimit = 0; /* Unused with GRD. */
322 ident->last_drop_prob = ident->drop_prob;
323 ident->drop_prob = allocate_grd(ident, comm_val);
325 /* Update other limiters with our rate by writing to comm layer. */
326 write_local_value(&ident->comm, ident->common.rate);
329 /* Update identity state. */
330 ident->common.last_rate = ident->common.rate;
334 * Traces all of the parent pointers of a leaf all the way to the root in
335 * order to find the maximum drop probability in the chain.
337 static double find_leaf_drop_prob(leaf_t *leaf) {
338 identity_t *current = leaf->parent;
343 while (current != NULL) {
344 if (current->drop_prob > result) {
345 result = current->drop_prob;
347 current = current->parent;
354 * This is called once per estimate interval to enforce the rate that allocate
355 * has decided upon. It makes calls to tc using system().
357 static void enforce(limiter_t *limiter, identity_t *ident) {
358 char cmd[CMD_BUFFER_SIZE];
362 switch (limiter->policy) {
365 /* TC treats limits of 0 (8bit) as unlimited, which causes the
366 * entire rate limiting system to become unpredictable. In
367 * reality, we also don't want any limiter to be able to set its
368 * limit so low that it chokes all of the flows to the point that
369 * they can't increase. Thus, when we're setting a low limit, we
370 * make sure that it isn't too low by using the
371 * FLOW_START_THRESHOLD. */
373 if (ident->locallimit < FLOW_START_THRESHOLD) {
374 ident->locallimit = FLOW_START_THRESHOLD;
377 /* Do not allow the node to set a limit higher than its
378 * administratively assigned upper limit (bwcap). */
379 if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
380 ident->locallimit = limiter->nodelimit;
383 if (system_loglevel == LOG_DEBUG) {
384 printf("FPS: Setting local limit to %d\n", ident->locallimit);
386 printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
388 if (printcounter == PRINT_COUNTER_RESET) {
389 printlog(LOG_WARN, "%d\n", ident->locallimit);
392 snprintf(cmd, CMD_BUFFER_SIZE,
393 "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
394 ident->htb_parent, ident->htb_node, ident->locallimit);
396 if (do_enforcement) {
400 /* FIXME: call failed. What to do? */
401 printlog(LOG_CRITICAL, "***TC call failed?***\n");
407 for (i = 0; i < ident->leaf_count; ++i) {
408 if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
409 /* The new drop probability for this identity is greater
410 * than or equal to the leaf's current drop probability.
411 * We can safely use the larger value at this leaf
413 ident->leaves[i]->drop_prob = ident->drop_prob;
414 } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
415 /* The old drop probability for this identity is less than
416 * the leaf's current drop probability. This means that
417 * this identity couldn't have been the limiting ident,
418 * so nothing needs to be done because the old limiting
419 * ident is still the limiting factor. */
421 /* Intentionally blank. */
423 /* If neither of the above are true, then...
424 * 1) The new drop probability for the identity is less
425 * than what it previously was, and
426 * 2) This ident may have had the maximum drop probability
427 * of all idents limiting this leaf, and therefore we need
428 * to follow the leaf's parents up to the root to find the
429 * new leaf drop probability safely. */
430 ident->leaves[i]->drop_prob =
431 find_leaf_drop_prob(ident->leaves[i]);
434 /* Make the call to tc. */
436 snprintf(cmd, CMD_BUFFER_SIZE,
437 "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
438 ident->leaves[i]->xid, ident->leaves[i]->xid,
439 (100 * ident->leaves[i]->drop_prob));
441 snprintf(cmd, CMD_BUFFER_SIZE,
442 "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
443 ident->leaves[i]->xid, ident->leaves[i]->xid,
444 (100 * ident->leaves[i]->drop_prob));
446 if (do_enforcement) {
450 /* FIXME: call failed. What to do? */
451 printlog(LOG_CRITICAL, "***TC call failed?***\n");
459 printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
467 * This function is periodically called to clean the stable instance's flow
468 * accounting tables for each identity.
470 static void clean(drl_instance_t *instance) {
471 identity_t *ident = NULL;
473 map_reset_iterate(instance->ident_map);
474 while ((ident = map_next(instance->ident_map)) != NULL) {
475 pthread_mutex_lock(&ident->table_mutex);
477 ident->table_cleanup_function(ident->table);
479 pthread_mutex_unlock(&ident->table_mutex);
482 /* Periodically flush the log file. */
486 static void print_averages(drl_instance_t *instance, int print_interval) {
487 identity_t *ident = NULL;
489 map_reset_iterate(instance->ident_map);
490 while ((ident = map_next(instance->ident_map)) != NULL) {
491 ident->avg_bytes /= (double) print_interval;
492 //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
493 printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
494 ident->avg_bytes / 128, ident->id);
495 //printf("%.3f \t Avg rate. ID:%d\n",
496 // ident->avg_bytes / 128, ident->id);
497 ident->avg_bytes = 0;
501 /** Thread function to handle local rate estimation.
503 * None of our simple hashmap functions are thread safe, so we lock the limiter
504 * with an rwlock to prevent another thread from attempting to modify the set
507 * Each identity also has a private lock for its table. This gets locked by
508 * table-modifying functions such as estimate and clean.
510 void handle_estimation(void *arg) {
511 limiter_t *limiter = (limiter_t *) arg;
512 identity_t *ident = NULL;
513 int clean_timer, clean_wait_intervals;
514 useconds_t sleep_time = limiter->estintms * 1000;
515 uint32_t cal_slot = 0;
516 int print_interval = 1000 / (limiter->estintms);
518 sigset_t signal_mask;
520 sigemptyset(&signal_mask);
521 sigaddset(&signal_mask, SIGHUP);
522 sigaddset(&signal_mask, SIGUSR1);
523 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
525 /* Determine the number of intervals we should wait before hitting the
526 * specified clean interval. (Converts seconds -> intervals). */
527 clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
528 clean_timer = clean_wait_intervals;
531 /* Sleep according to the delay of the estimate interval. */
534 /* Grab the limiter lock for reading. This prevents identities from
535 * disappearing beneath our feet. */
536 pthread_rwlock_rdlock(&limiter->limiter_lock);
538 cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
540 /* Service all the identities that are scheduled to run during this
542 while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
543 ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
544 TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar);
546 /* Update the ident's flow accouting table with the latest info. */
549 /* Determine its share of the rate allocation. */
550 allocate(limiter, ident);
552 /* Make tc calls to enforce the rate we decided upon. */
553 enforce(limiter, ident);
555 /* Tell the comm library to propagate this identity's result for
557 send_update(&ident->comm, ident->id);
559 /* Add ident back to the queue at a future time slot. */
560 TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
561 ((cal_slot + ident->intervals) & SCHEDMASK),
566 if (loglevel() == LOG_DEBUG && print_interval <= 0) {
567 print_interval = 1000 / (limiter->estintms);
568 print_averages(&limiter->stable_instance, print_interval);
571 /* Check if enough intervals have passed for cleaning. */
572 if (clean_timer <= 0) {
573 clean(&limiter->stable_instance);
574 clean_timer = clean_wait_intervals;
579 limiter->stable_instance.cal_slot += 1;
581 pthread_rwlock_unlock(&limiter->limiter_lock);