1 /* See the DRL-LICENSE file for this file's software license. */
4 * Thread to periodically calculate the estimated local limits
5 * Barath Raghavan 2006/2007
12 /** The size of the buffer we use to hold tc commands. */
13 #define CMD_BUFFER_SIZE 200
16 #include "raterouter.h"
18 #include "ratetypes.h" /* needs util and pthread.h */
22 extern uint8_t system_loglevel;
24 uint8_t do_enforcement = 0;
27 * Called for each identity each estimate interval. Uses flow table information
28 * to estimate the current aggregate rate and the rate of the individual flows
31 static void estimate(identity_t *ident, const double estintms) {
33 double time_difference;
35 pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
37 gettimeofday(&now, NULL);
39 time_difference = timeval_subtract(now, ident->common.last_update);
41 if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) {
42 printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n",
43 estintms * ident->mainloop_intervals, time_difference * 1000);
46 ident->table_update_function(ident->table, now, ident->ewma_weight);
50 standard_table_update_flows((standard_flow_table) ident->shadow_table, now,
55 pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
59 * Determines the FPS weight allocation when the identity is under its current
62 static double allocate_fps_under_limit(identity_t *ident, uint32_t target, double peer_weights) {
64 double total_weight = peer_weights + ident->last_localweight;
66 if (target >= ident->limit) {
67 ideal_weight = total_weight;
68 } else if (target <= 0) {
69 ideal_weight = 0; // no flows here
71 ideal_weight = ((double)target / (double)ident->limit) * total_weight;
75 else if (peer_weights <= 0) {
77 // doesn't matter what we pick as our weight, so pick 1 / N.
78 ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
80 ideal_weight = ((double)target / (double)ident->limit) * total_weight;
83 double divisor = (double) ident->limit - (double) target;
84 ideal_weight = ((double) target * peer_weights) / divisor;
86 ideal_weight = ((double)target / (double)ident->limit) * total_weight;
95 * Determines the FPS weight allocation when the identity is over its current
98 static double allocate_fps_over_limit(identity_t *ident) {
100 double total_over_max;
102 if (ident->common.max_flow_rate > 0) {
103 ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
104 total_over_max = (double) ident->common.rate / (double) ident->common.max_flow_rate;
106 printlog(LOG_DEBUG, "ideal_over: %.3f, limit: %d, max_flow_rate: %d, total_rate: %d, total/max: %.3f\n",
107 ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate, total_over_max);
116 * When FPS checks to see which mode it should be operating in
117 * (over limit vs under limit), we don't want it to actually look to
118 * see if we're at the limit. Instead, we want to see if we're getting
119 * close to the limit. This defines how close is "close enough".
121 * For example, if the limit is 50000 and we're sending 49000, we probably
122 * want to be in the over limit mode, even if we aren't actually over the limit
123 * in order to switch to the more aggressive weight calculations.
125 static inline uint32_t close_enough(uint32_t limit) {
126 uint32_t difference = limit - (limit * CLOSE_ENOUGH);
128 if (difference < 2500) {
129 return (limit - 2500);
131 return (limit * CLOSE_ENOUGH);
135 static void print_statistics(identity_t *ident, const double ideal_weight,
136 const double total_weight, const double localweight,
137 const char *identifier, common_accounting_t *table,
138 const uint32_t resulting_limit) {
142 gettimeofday(&tv, NULL);
143 time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
145 printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d %d %s:%d ",
146 time_now, table->inst_rate, ideal_weight, localweight, total_weight,
147 table->num_flows, table->num_flows_5k, table->num_flows_10k,
148 table->num_flows_20k, table->num_flows_50k, table->avg_rate,
149 table->max_flow_rate, table->max_flow_rate_flow_hash, resulting_limit,
150 identifier, ident->id);
152 if (table->max_flow_rate > 0) {
153 printlog(LOG_WARN, "%.3f\n", (double) table->rate / (double) table->max_flow_rate);
155 printlog(LOG_WARN, "0\n");
158 /* Print to the screen in debug mode. */
159 if (system_loglevel == LOG_DEBUG) {
160 printf("Local Rate: %d, Ideal Weight: %.3f, Local Weight: %.3f, Total Weight: %.3f\n",
161 table->rate, ideal_weight, ident->localweight, total_weight);
165 static uint32_t allocate_fps(identity_t *ident, double total_weight,
166 common_accounting_t *table, const char *identifier) {
168 uint32_t resulting_limit = 0;
169 double ideal_weight = 0.0;
170 double peer_weights = total_weight - ident->last_localweight;
172 /* Keep track of these for measurements & comparisons only. */
173 double ideal_under = 0.0;
174 double ideal_over = 0.0;
177 if (peer_weights < 0.0) {
181 if (ident->dampen_state == DAMPEN_TEST) {
182 int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
183 double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
185 if (rate_delta > threshold) {
186 ident->dampen_state = DAMPEN_PASSED;
188 ident->dampen_state = DAMPEN_FAILED;
192 /* Rate/weight sanity. */
193 if (table->rate <= 0) {
197 /* Under the limit OR we failed our dampening test OR our current
198 * outgoing traffic rate is under the low "flowstart" watermark. */
199 else if (ident->dampen_state == DAMPEN_FAILED ||
200 table->rate < close_enough(ident->locallimit)) {
202 || ident->flowstart) {
203 uint32_t target_rate = table->rate;
205 if (ident->flowstart) {
208 if (table->rate >= FLOW_START_THRESHOLD) {
209 ident->flowstart = false;
212 /* June 16, 2008 (KCW)
213 * ident->flowstart gets set initially to one, but it is never set again. However,
214 * if a limiter gets flows and then the number of flows drops to zero, it has trouble
215 * increasing the limit again. */
216 if (table->rate < FLOW_START_THRESHOLD) {
217 ident->flowstart = true;
223 /* Boost low-limits so that they have room to grow. */
224 if (table->rate < FLOW_START_THRESHOLD) {
225 ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
227 ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
230 ideal_over = allocate_fps_over_limit(ident);
232 if (ideal_over < ideal_under) {
233 /* Degenerate case in which the agressive weight calculation was
234 * actually less than the under-the-limit case. Use it instead
235 * and skip the dampening check in the next interval. */
236 ideal_weight = ideal_over;
237 ident->dampen_state = DAMPEN_SKIP;
239 ident->dampen_state = DAMPEN_NONE;
243 ident->localweight = (ident->localweight * ident->ewma_weight +
244 ideal_weight * (1 - ident->ewma_weight));
247 /* At or over the limit. Use the aggressive weight calculation. */
249 double portion_last_interval = 0.0;
250 double portion_this_interval = 0.0;
252 ideal_weight = ideal_over = allocate_fps_over_limit(ident);
253 ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
256 ident->localweight = (ident->localweight * ident->ewma_weight +
257 ideal_weight * (1 - ident->ewma_weight));
259 /* Now check whether the result of the aggressive weight calculation
260 * increases our portion of the weight "too much", in which case we
263 /* Our portion of weight in the whole system during the last interval.*/
264 portion_last_interval = ident->last_localweight / total_weight;
266 /* Our proposed portion of weight for the current interval. */
267 portion_this_interval = ident->localweight / (peer_weights + ident->localweight);
269 if (ident->dampen_state == DAMPEN_NONE &&
270 (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
271 ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
272 ident->dampen_state = DAMPEN_TEST;
274 ident->dampen_state = DAMPEN_SKIP;
278 /* Add the weight calculated in this interval to the total. */
279 ident->total_weight = total_weight = ident->localweight + peer_weights;
281 /* Convert weight value into a rate limit. If there is no measureable
282 * weight, do a L/n allocation. */
283 if (total_weight > 0) {
284 resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight);
286 resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
289 print_statistics(ident, ideal_weight, total_weight, ident->localweight,
290 identifier, table, resulting_limit);
292 return resulting_limit;
295 #ifdef SHADOW_ACCTING
297 /* Runs through the allocate functionality without making any state changes to
298 * the identity. Useful for comparisons, especially for comparing standard
299 * and sample&hold accounting schemes. */
300 static void allocate_fps_pretend(identity_t *ident, double total_weight,
301 common_accounting_t *table, const char *identifier) {
303 uint32_t resulting_limit = 0;
304 double ideal_weight = 0.0;
305 double peer_weights = total_weight - ident->last_localweight_copy;
306 double ideal_under = 0.0;
307 double ideal_over = 0.0;
309 if (peer_weights < 0.0) {
313 if (ident->dampen_state_copy == DAMPEN_TEST) {
314 int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
315 double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
317 if (rate_delta > threshold) {
318 ident->dampen_state_copy = DAMPEN_PASSED;
320 ident->dampen_state_copy = DAMPEN_FAILED;
324 /* Rate/weight sanity. */
325 if (table->rate <= 0) {
329 /* Under the limit OR we failed our dampening test OR our current
330 * outgoing traffic rate is under the low "flowstart" watermark. */
331 else if (ident->dampen_state_copy == DAMPEN_FAILED ||
332 table->rate < close_enough(ident->locallimit)) {
334 /* Boost low-limits so that they have room to grow. */
335 if (table->rate < FLOW_START_THRESHOLD) {
336 ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
338 ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
341 ideal_over = allocate_fps_over_limit(ident);
343 if (ideal_over < ideal_under) {
344 /* Degenerate case in which the agressive weight calculation was
345 * actually less than the under-the-limit case. Use it instead
346 * and skip the dampening check in the next interval. */
347 ideal_weight = ideal_over;
348 ident->dampen_state_copy = DAMPEN_SKIP;
350 ident->dampen_state_copy = DAMPEN_NONE;
354 ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
355 ideal_weight * (1 - ident->ewma_weight));
358 /* At or over the limit. Use the aggressive weight calculation. */
360 double portion_last_interval = 0.0;
361 double portion_this_interval = 0.0;
363 ideal_weight = ideal_over = allocate_fps_over_limit(ident);
364 ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
367 ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
368 ideal_weight * (1 - ident->ewma_weight));
370 /* Now check whether the result of the aggressive weight calculation
371 * increases our portion of the weight "too much", in which case we
374 /* Our portion of weight in the whole system during the last interval.*/
375 portion_last_interval = ident->last_localweight / total_weight;
377 /* Our proposed portion of weight for the current interval. */
378 portion_this_interval = ident->localweight_copy / (peer_weights + ident->localweight_copy);
380 if (ident->dampen_state_copy == DAMPEN_NONE &&
381 (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
382 ident->localweight_copy = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
383 ident->dampen_state_copy = DAMPEN_TEST;
385 ident->dampen_state_copy = DAMPEN_SKIP;
389 /* Add the weight calculated in this interval to the total. */
390 total_weight = ident->localweight_copy + peer_weights;
392 /* Convert weight value into a rate limit. If there is no measureable
393 * weight, do a L/n allocation. */
394 if (total_weight > 0) {
395 resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight);
397 resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
400 print_statistics(ident, ideal_weight, total_weight, ident->localweight_copy,
401 identifier, table, resulting_limit);
407 * Determines the amount of FPS weight to allocate to the identity during each
408 * estimate interval. Note that total_weight includes local weight.
410 static uint32_t allocate_fps_old(identity_t *ident, double total_weight) {
411 common_accounting_t *ftable = &ident->common; /* Common flow table info */
412 uint32_t local_rate = ftable->rate;
413 uint32_t ideallocal = 0;
414 double peer_weights; /* sum of weights of all other limiters */
415 double idealweight = 0;
416 double last_portion = 0;
417 double this_portion = 0;
419 static int dampen = 0;
420 int dampen_increase = 0;
422 double ideal_under = 0;
423 double ideal_over = 0;
428 1. the aggregate is < limit
429 2. the aggregate is >= limit
431 peer_weights = total_weight - ident->last_localweight;
432 if (peer_weights < 0) {
438 (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
440 (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
442 if (rate_delta > threshold) {
444 printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
445 rate_delta, threshold);
449 if (local_rate <= 0) {
451 } else if (dampen_increase == 0 &&
452 (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) {
453 /* We're under the limit - all flows are bottlenecked. */
454 idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
455 ideal_over = allocate_fps_over_limit(ident);
456 ideal_under = idealweight;
458 if (ideal_over < idealweight) {
459 idealweight = ideal_over;
468 ident->localweight = (ident->localweight * ident->ewma_weight +
469 idealweight * (1 - ident->ewma_weight));
472 idealweight = allocate_fps_over_limit(ident);
475 ident->localweight = (ident->localweight * ident->ewma_weight +
476 idealweight * (1 - ident->ewma_weight));
478 /* This is the portion of the total weight in the system that was caused
479 * by this limiter in the last interval. */
480 last_portion = ident->last_localweight / total_weight;
482 /* This is the fraction of the total weight in the system that our
483 * proposed value for idealweight would use. */
484 this_portion = ident->localweight / (peer_weights + ident->localweight);
486 /* Dampen the large increase the first time... */
487 if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
488 ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
494 ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
495 ideal_over = idealweight;
500 /* Convert weight into a rate - add in our new local weight */
501 ident->total_weight = total_weight = ident->localweight + peer_weights;
503 /* compute local allocation:
504 if there is traffic elsewhere, use the weights
505 otherwise do a L/n allocation */
506 if (total_weight > 0) {
507 //if (peer_weights > 0) {
508 ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
510 ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
513 printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
515 printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f Under / Over / Actual / Rate\n",
516 ideal_under / (ideal_under + peer_weights),
517 ideal_over / (ideal_over + peer_weights),
518 ident->localweight / (ident->localweight + peer_weights),
519 (double) local_rate / (double) ident->limit);
521 printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
523 if (system_loglevel == LOG_DEBUG) {
524 printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
525 local_rate, idealweight, ident->localweight, total_weight);
529 if (printcounter <= 0) {
533 gettimeofday(&tv, NULL);
534 time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
536 printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d ", time_now, ftable->inst_rate,
537 idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k,
538 ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate,
539 ftable->max_flow_rate, ftable->max_flow_rate_flow_hash);
541 printcounter = PRINT_COUNTER_RESET;
546 //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
547 // dampen, dampen_increase, peer_weights, regime);
550 printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
551 ideal_over, ideal_under);
553 See print_statistics()
556 printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
562 * Determines the local drop probability for a GRD identity every estimate
565 static double allocate_grd(identity_t *ident, double aggdemand) {
567 double global_limit = (double) (ident->limit);
569 if (aggdemand > global_limit) {
570 dropprob = (aggdemand-global_limit)/aggdemand;
575 if (system_loglevel == LOG_DEBUG) {
576 printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
577 ident->common.rate, aggdemand, dropprob);
584 * Given current estimates of local rate (weight) and remote rates (weights)
585 * use GRD or FPS to calculate a new local limit.
587 static void allocate(limiter_t *limiter, identity_t *ident) {
588 /* Represents aggregate rate for GRD and aggregate weight for FPS. */
591 /* Read comm_val from comm layer. */
592 if (limiter->policy == POLICY_FPS) {
593 read_comm(&ident->comm, &comm_val,
594 ident->total_weight / (double) (ident->comm.remote_node_count + 1));
596 read_comm(&ident->comm, &comm_val,
597 (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
599 printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
601 /* Experimental printing. */
602 printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
603 (double) ident->common.rate / (double) 128, ident->id);
604 ident->avg_bytes += ident->common.rate;
606 if (limiter->policy == POLICY_FPS) {
607 #ifdef SHADOW_ACCTING
609 allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID");
611 ident->last_localweight_copy = ident->localweight_copy;
614 ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID");
615 ident->last_localweight = ident->localweight;
617 /* Update other limiters with our weight by writing to comm layer. */
618 write_local_value(&ident->comm, ident->localweight);
620 ident->locallimit = 0; /* Unused with GRD. */
621 ident->last_drop_prob = ident->drop_prob;
622 ident->drop_prob = allocate_grd(ident, comm_val);
624 /* Update other limiters with our rate by writing to comm layer. */
625 write_local_value(&ident->comm, ident->common.rate);
628 /* Update identity state. */
629 ident->common.last_rate = ident->common.rate;
633 * Traces all of the parent pointers of a leaf all the way to the root in
634 * order to find the maximum drop probability in the chain.
636 static double find_leaf_drop_prob(leaf_t *leaf) {
637 identity_t *current = leaf->parent;
642 while (current != NULL) {
643 if (current->drop_prob > result) {
644 result = current->drop_prob;
646 current = current->parent;
653 * This is called once per estimate interval to enforce the rate that allocate
654 * has decided upon. It makes calls to tc using system().
656 static void enforce(limiter_t *limiter, identity_t *ident) {
657 char cmd[CMD_BUFFER_SIZE];
661 switch (limiter->policy) {
664 /* TC treats limits of 0 (8bit) as unlimited, which causes the
665 * entire rate limiting system to become unpredictable. In
666 * reality, we also don't want any limiter to be able to set its
667 * limit so low that it chokes all of the flows to the point that
668 * they can't increase. Thus, when we're setting a low limit, we
669 * make sure that it isn't too low by using the
670 * FLOW_START_THRESHOLD. */
672 if (ident->locallimit < FLOW_START_THRESHOLD) {
673 ident->locallimit = FLOW_START_THRESHOLD;
676 /* Do not allow the node to set a limit higher than its
677 * administratively assigned upper limit (bwcap). */
678 if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
679 ident->locallimit = limiter->nodelimit;
682 if (system_loglevel == LOG_DEBUG) {
683 printf("FPS: Setting local limit to %d\n", ident->locallimit);
685 printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
688 if (printcounter == PRINT_COUNTER_RESET) {
689 if (ident->common.max_flow_rate > 0) {
690 printlog(LOG_WARN, "%d ID:%d %.3f\n", ident->locallimit, ident->id,
691 (double) ident->common.rate / (double) ident->common.max_flow_rate);
693 printlog(LOG_WARN, "%d ID:%d 0\n", ident->locallimit, ident->id);
696 This is now done in print_statistics()
699 snprintf(cmd, CMD_BUFFER_SIZE,
700 "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
701 ident->htb_parent, ident->htb_node, ident->locallimit);
703 if (do_enforcement) {
707 /* FIXME: call failed. What to do? */
708 printlog(LOG_CRITICAL, "***TC call failed? Call was: %s***\n", cmd);
714 for (i = 0; i < ident->leaf_count; ++i) {
715 if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
716 /* The new drop probability for this identity is greater
717 * than or equal to the leaf's current drop probability.
718 * We can safely use the larger value at this leaf
720 ident->leaves[i]->drop_prob = ident->drop_prob;
721 } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
722 /* The old drop probability for this identity is less than
723 * the leaf's current drop probability. This means that
724 * this identity couldn't have been the limiting ident,
725 * so nothing needs to be done because the old limiting
726 * ident is still the limiting factor. */
728 /* Intentionally blank. */
730 /* If neither of the above are true, then...
731 * 1) The new drop probability for the identity is less
732 * than what it previously was, and
733 * 2) This ident may have had the maximum drop probability
734 * of all idents limiting this leaf, and therefore we need
735 * to follow the leaf's parents up to the root to find the
736 * new leaf drop probability safely. */
737 ident->leaves[i]->drop_prob =
738 find_leaf_drop_prob(ident->leaves[i]);
741 /* Make the call to tc. */
743 snprintf(cmd, CMD_BUFFER_SIZE,
744 "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
745 ident->leaves[i]->xid, ident->leaves[i]->xid,
746 (100 * ident->leaves[i]->drop_prob));
748 snprintf(cmd, CMD_BUFFER_SIZE,
749 "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
750 ident->leaves[i]->xid, ident->leaves[i]->xid,
751 (100 * ident->leaves[i]->drop_prob));
753 if (do_enforcement) {
757 /* FIXME: call failed. What to do? */
758 printlog(LOG_CRITICAL, "***TC call failed?***\n");
766 printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
774 * This function is periodically called to clean the stable instance's flow
775 * accounting tables for each identity.
777 static void clean(drl_instance_t *instance) {
778 identity_t *ident = NULL;
780 map_reset_iterate(instance->ident_map);
781 while ((ident = map_next(instance->ident_map)) != NULL) {
782 pthread_mutex_lock(&ident->table_mutex);
784 ident->table_cleanup_function(ident->table);
786 #ifdef SHADOW_ACCTING
788 standard_table_cleanup((standard_flow_table) ident->shadow_table);
792 pthread_mutex_unlock(&ident->table_mutex);
795 /* Periodically flush the log file. */
799 static void print_averages(drl_instance_t *instance, int print_interval) {
800 identity_t *ident = NULL;
802 map_reset_iterate(instance->ident_map);
803 while ((ident = map_next(instance->ident_map)) != NULL) {
804 ident->avg_bytes /= (double) print_interval;
805 //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
806 printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
807 ident->avg_bytes / 128, ident->id);
808 //printf("%.3f \t Avg rate. ID:%d\n",
809 // ident->avg_bytes / 128, ident->id);
810 ident->avg_bytes = 0;
814 /** Thread function to handle local rate estimation.
816 * None of our simple hashmap functions are thread safe, so we lock the limiter
817 * with an rwlock to prevent another thread from attempting to modify the set
820 * Each identity also has a private lock for its table. This gets locked by
821 * table-modifying functions such as estimate and clean. It's also locked in
822 * ulogd_DRL.c when the table is being updated with new packets.
824 void handle_estimation(void *arg) {
825 limiter_t *limiter = (limiter_t *) arg;
826 int clean_timer, clean_wait_intervals;
827 useconds_t sleep_time = limiter->estintms * 1000;
828 uint32_t cal_slot = 0;
829 int print_interval = 1000 / (limiter->estintms);
831 sigset_t signal_mask;
833 sigemptyset(&signal_mask);
834 sigaddset(&signal_mask, SIGHUP);
835 sigaddset(&signal_mask, SIGUSR1);
836 sigaddset(&signal_mask, SIGUSR2);
837 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
839 /* Determine the number of intervals we should wait before hitting the
840 * specified clean interval. (Converts seconds -> intervals). */
841 clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
842 clean_timer = clean_wait_intervals;
845 printlog(LOG_DEBUG, "--Beginning new tick.--\n");
847 /* Sleep according to the delay of the estimate interval. */
850 /* Grab the limiter lock for reading. This prevents identities from
851 * disappearing beneath our feet. */
852 pthread_rwlock_rdlock(&limiter->limiter_lock);
854 cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
856 /* Service all the identities that are scheduled to run during this
858 while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
859 identity_action *iaction = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
860 TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, iaction, calendar);
862 /* Only execute the action if it is valid. */
863 if (iaction->valid == 0) {
868 switch (iaction->action) {
869 case ACTION_MAINLOOP:
871 printlog(LOG_DEBUG, "Main loop: identity %d\n", iaction->ident->id);
873 /* Update the ident's flow accouting table with the latest info. */
874 estimate(iaction->ident, limiter->estintms);
876 /* Determine its share of the rate allocation. */
877 allocate(limiter, iaction->ident);
879 /* Make tc calls to enforce the rate we decided upon. */
880 enforce(limiter, iaction->ident);
882 /* Add ident back to the queue at a future time slot. */
883 TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
884 ((cal_slot + iaction->ident->mainloop_intervals) & SCHEDMASK),
888 case ACTION_COMMUNICATE:
890 printlog(LOG_DEBUG, "Communicating: identity %d\n", iaction->ident->id);
892 /* Tell the comm library to propagate this identity's result for
894 send_update(&iaction->ident->comm, iaction->ident->id);
896 /* Add ident back to the queue at a future time slot. */
897 TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
898 ((cal_slot + iaction->ident->communication_intervals) & SCHEDMASK),
903 printlog(LOG_CRITICAL, "Unknown identity action!?!\n");
909 if (loglevel() == LOG_DEBUG && print_interval <= 0) {
910 print_interval = 1000 / (limiter->estintms);
911 print_averages(&limiter->stable_instance, print_interval);
914 /* Check if enough intervals have passed for cleaning. */
915 if (clean_timer <= 0) {
916 clean(&limiter->stable_instance);
917 clean_timer = clean_wait_intervals;
922 limiter->stable_instance.cal_slot += 1;
924 pthread_rwlock_unlock(&limiter->limiter_lock);