/* See the DRL-LICENSE file for this file's software license. */ /* * Thread to periodically calculate the estimated local limits * Barath Raghavan 2006/2007 * Ken Yocum 2007 * Kevin Webb 2007/2008 */ #include /** The size of the buffer we use to hold tc commands. */ #define CMD_BUFFER_SIZE 200 /* DRL specifics */ #include "raterouter.h" #include "util.h" #include "ratetypes.h" /* needs util and pthread.h */ #include "calendar.h" #include "logging.h" extern uint8_t system_loglevel; uint8_t do_enforcement = 0; /** * Called for each identity each estimate interval. Uses flow table information * to estimate the current aggregate rate and the rate of the individual flows * in the table. */ static void estimate(identity_t *ident, const double estintms) { struct timeval now; double time_difference; pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */ gettimeofday(&now, NULL); time_difference = timeval_subtract(now, ident->common.last_update); if (time_difference > .01 + (estintms / 1000 * ident->mainloop_intervals)) { printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n", estintms * ident->mainloop_intervals, time_difference * 1000); } ident->table_update_function(ident->table, now, ident->ewma_weight); #ifdef SHADOW_ACCTING standard_table_update_flows((standard_flow_table) ident->shadow_table, now, ident->ewma_weight); #endif pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */ } /** * Determines the FPS weight allocation when the identity is under its current * local rate limit. */ static double allocate_fps_under_limit(identity_t *ident, uint32_t target, double peer_weights) { double ideal_weight; double total_weight = peer_weights + ident->last_localweight; if (target >= ident->effective_limit) { ideal_weight = total_weight; } else if (target <= 0) { ideal_weight = 0; // no flows here } else { ideal_weight = ((double)target / (double)ident->effective_limit) * total_weight; } return ideal_weight; } /** * Determines the FPS weight allocation when the identity is over its current * local rate limit. */ static double allocate_fps_over_limit(identity_t *ident) { double ideal_weight; double total_over_max; if (ident->common.max_flow_rate > 0) { ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate; total_over_max = (double) ident->common.rate / (double) ident->common.max_flow_rate; printlog(LOG_DEBUG, "ideal_over: %.3f, limit: %d, max_flow_rate: %d, total_rate: %d, total/max: %.3f\n", ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate, total_over_max); } else { ideal_weight = 1; } return ideal_weight; } /** * When FPS checks to see which mode it should be operating in * (over limit vs under limit), we don't want it to actually look to * see if we're at the limit. Instead, we want to see if we're getting * close to the limit. This defines how close is "close enough". * * For example, if the limit is 50000 and we're sending 49000, we probably * want to be in the over limit mode, even if we aren't actually over the limit * in order to switch to the more aggressive weight calculations. */ static inline uint32_t close_enough(uint32_t limit) { uint32_t difference = limit - (limit * CLOSE_ENOUGH); if (difference < 10240) { return (limit - 10240); } else { return (limit * CLOSE_ENOUGH); } } static void print_statistics(identity_t *ident, const double ideal_weight, const double total_weight, const double localweight, const char *identifier, common_accounting_t *table, const uint32_t resulting_limit) { struct timeval tv; double time_now; gettimeofday(&tv, NULL); time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d %d %s:%d ", time_now, table->inst_rate, ideal_weight, localweight, total_weight, table->num_flows, table->num_flows_5k, table->num_flows_10k, table->num_flows_20k, table->num_flows_50k, table->avg_rate, table->max_flow_rate, table->max_flow_rate_flow_hash, resulting_limit, identifier, ident->id); if (table->max_flow_rate > 0) { printlog(LOG_WARN, "%.3f\n", (double) table->rate / (double) table->max_flow_rate); } else { printlog(LOG_WARN, "0\n"); } /* Print to the screen in debug mode. */ if (system_loglevel == LOG_DEBUG) { printf("Local Rate: %d, Ideal Weight: %.3f, Local Weight: %.3f, Total Weight: %.3f\n", table->rate, ideal_weight, ident->localweight, total_weight); } } static uint32_t allocate_fps(identity_t *ident, double total_weight, common_accounting_t *table, const char *identifier) { uint32_t resulting_limit = 0; double ideal_weight = 0.0; double peer_weights = total_weight - ident->last_localweight; /* Keep track of these for measurements & comparisons only. */ double ideal_under = 0.0; double ideal_over = 0.0; /* Weight sanity. */ if (peer_weights < 0.0) { peer_weights = 0.0; } if (ident->dampen_state == DAMPEN_TEST) { int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10; if (rate_delta > threshold) { ident->dampen_state = DAMPEN_PASSED; } else { ident->dampen_state = DAMPEN_FAILED; } } /* Rate/weight sanity. */ if (table->rate <= 0) { ideal_weight = 0.0; } /* Under the limit OR we failed our dampening test OR our current * outgoing traffic rate is under the low "flowstart" watermark. */ else if (ident->dampen_state == DAMPEN_FAILED || table->rate < close_enough(ident->locallimit)) { #if 0 || ident->flowstart) { uint32_t target_rate = table->rate; if (ident->flowstart) { target_rate *= 4; if (table->rate >= FLOW_START_THRESHOLD) { ident->flowstart = false; } } else { /* June 16, 2008 (KCW) * ident->flowstart gets set initially to one, but it is never set again. However, * if a limiter gets flows and then the number of flows drops to zero, it has trouble * increasing the limit again. */ if (table->rate < FLOW_START_THRESHOLD) { ident->flowstart = true; } } Old flowstart code. #endif //printf("rate is %d, close enough is %d, difference is %d\n", table->rate, close_enough(ident->locallimit), close_enough(ident->locallimit) - table->rate); /* Boost low-limits so that they have room to grow. */ if (table->rate < FLOW_START_THRESHOLD) { ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights); } else { ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); } ideal_over = allocate_fps_over_limit(ident); if (ideal_over < ideal_under) { /* Degenerate case in which the agressive weight calculation was * actually less than the under-the-limit case. Use it instead * and skip the dampening check in the next interval. */ ideal_weight = ideal_over; ident->dampen_state = DAMPEN_SKIP; } else { ident->dampen_state = DAMPEN_NONE; } /* Apply EWMA. */ ident->localweight = (ident->localweight * ident->ewma_weight + ideal_weight * (1 - ident->ewma_weight)); } /* At or over the limit. Use the aggressive weight calculation. */ else { double portion_last_interval = 0.0; double portion_this_interval = 0.0; ideal_weight = ideal_over = allocate_fps_over_limit(ident); ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); /* Apply EWMA. */ ident->localweight = (ident->localweight * ident->ewma_weight + ideal_weight * (1 - ident->ewma_weight)); /* Now check whether the result of the aggressive weight calculation * increases our portion of the weight "too much", in which case we * dampen it. */ /* Our portion of weight in the whole system during the last interval.*/ portion_last_interval = ident->last_localweight / total_weight; /* Our proposed portion of weight for the current interval. */ portion_this_interval = ident->localweight / (peer_weights + ident->localweight); if (ident->dampen_state == DAMPEN_NONE && (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) { ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight); ident->dampen_state = DAMPEN_TEST; } else { ident->dampen_state = DAMPEN_SKIP; } } /* Add the weight calculated in this interval to the total. */ ident->total_weight = total_weight = ident->localweight + peer_weights; /* Convert weight value into a rate limit. If there is no measureable * weight, do a L/n allocation. */ if (total_weight > 0) { resulting_limit = (uint32_t) (ident->localweight * ident->effective_limit / total_weight); } else { resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); } print_statistics(ident, ideal_weight, total_weight, ident->localweight, identifier, table, resulting_limit); return resulting_limit; } #ifdef SHADOW_ACCTING /* Runs through the allocate functionality without making any state changes to * the identity. Useful for comparisons, especially for comparing standard * and sample&hold accounting schemes. */ static void allocate_fps_pretend(identity_t *ident, double total_weight, common_accounting_t *table, const char *identifier) { uint32_t resulting_limit = 0; double ideal_weight = 0.0; double peer_weights = total_weight - ident->last_localweight_copy; double ideal_under = 0.0; double ideal_over = 0.0; if (peer_weights < 0.0) { peer_weights = 0.0; } if (ident->dampen_state_copy == DAMPEN_TEST) { int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10; if (rate_delta > threshold) { ident->dampen_state_copy = DAMPEN_PASSED; } else { ident->dampen_state_copy = DAMPEN_FAILED; } } /* Rate/weight sanity. */ if (table->rate <= 0) { ideal_weight = 0.0; } /* Under the limit OR we failed our dampening test OR our current * outgoing traffic rate is under the low "flowstart" watermark. */ else if (ident->dampen_state_copy == DAMPEN_FAILED || table->rate < close_enough(ident->locallimit)) { /* Boost low-limits so that they have room to grow. */ if (table->rate < FLOW_START_THRESHOLD) { ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights); } else { ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); } ideal_over = allocate_fps_over_limit(ident); if (ideal_over < ideal_under) { /* Degenerate case in which the agressive weight calculation was * actually less than the under-the-limit case. Use it instead * and skip the dampening check in the next interval. */ ideal_weight = ideal_over; ident->dampen_state_copy = DAMPEN_SKIP; } else { ident->dampen_state_copy = DAMPEN_NONE; } /* Apply EWMA. */ ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight + ideal_weight * (1 - ident->ewma_weight)); } /* At or over the limit. Use the aggressive weight calculation. */ else { double portion_last_interval = 0.0; double portion_this_interval = 0.0; ideal_weight = ideal_over = allocate_fps_over_limit(ident); ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); /* Apply EWMA. */ ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight + ideal_weight * (1 - ident->ewma_weight)); /* Now check whether the result of the aggressive weight calculation * increases our portion of the weight "too much", in which case we * dampen it. */ /* Our portion of weight in the whole system during the last interval.*/ portion_last_interval = ident->last_localweight / total_weight; /* Our proposed portion of weight for the current interval. */ portion_this_interval = ident->localweight_copy / (peer_weights + ident->localweight_copy); if (ident->dampen_state_copy == DAMPEN_NONE && (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) { ident->localweight_copy = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight); ident->dampen_state_copy = DAMPEN_TEST; } else { ident->dampen_state_copy = DAMPEN_SKIP; } } /* Add the weight calculated in this interval to the total. */ total_weight = ident->localweight_copy + peer_weights; /* Convert weight value into a rate limit. If there is no measureable * weight, do a L/n allocation. */ if (total_weight > 0) { resulting_limit = (uint32_t) (ident->localweight_copy * ident->effective_limit / total_weight); } else { resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); } print_statistics(ident, ideal_weight, total_weight, ident->localweight_copy, identifier, table, resulting_limit); } #endif /** * Determines the local drop probability for a GRD identity every estimate * interval. */ static double allocate_grd(identity_t *ident, double aggdemand) { double dropprob; double min_dropprob = ident->drop_prob * GRD_BIG_DROP; struct timeval tv; double time_now; common_accounting_t *table = &ident->common; gettimeofday(&tv, NULL); time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); if (aggdemand > ident->effective_limit) { dropprob = (aggdemand - ident->effective_limit) / aggdemand; } else { dropprob = 0.0; } if (dropprob > 0.01 && dropprob < min_dropprob) { dropprob = min_dropprob; } if (system_loglevel == LOG_DEBUG) { printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n", ident->common.rate, aggdemand, dropprob); } if (table->max_flow_rate > 0) { printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d %.3f\n", time_now, table->inst_rate, aggdemand, table->num_flows, table->num_flows_5k, table->num_flows_10k, table->num_flows_20k, table->num_flows_50k, table->avg_rate, table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob, ident->id, (double) table->rate / (double) table->max_flow_rate); } else { printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d 0\n", time_now, table->inst_rate, aggdemand, table->num_flows, table->num_flows_5k, table->num_flows_10k, table->num_flows_20k, table->num_flows_50k, table->avg_rate, table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob, ident->id); } return dropprob; } /** * Given current estimates of local rate (weight) and remote rates (weights) * use GRD or FPS to calculate a new local limit. */ static void allocate(limiter_t *limiter, identity_t *ident) { /* Represents aggregate rate for GRD and aggregate weight for FPS. */ double aggregate = 0; /* Read aggregate from comm layer. */ read_comm(&aggregate, &ident->effective_limit, &ident->comm, ident->limit); printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", aggregate); /* Experimental printing. */ printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n", (double) ident->common.rate / (double) 128, ident->id); ident->avg_bytes += ident->common.rate; if (limiter->policy == POLICY_FPS) { #ifdef SHADOW_ACCTING allocate_fps_pretend(ident, aggregate, &ident->shadow_common, "SHADOW-ID"); ident->last_localweight_copy = ident->localweight_copy; #endif ident->locallimit = allocate_fps(ident, aggregate, &ident->common, "ID"); ident->last_localweight = ident->localweight; /* Update other limiters with our weight by writing to comm layer. */ write_local_value(&ident->comm, ident->localweight); } else { ident->last_drop_prob = ident->drop_prob; ident->drop_prob = allocate_grd(ident, aggregate); /* Update other limiters with our rate by writing to comm layer. */ write_local_value(&ident->comm, ident->common.rate); } /* Update identity state. */ ident->common.last_rate = ident->common.rate; } /** * Traces all of the parent pointers of a leaf all the way to the root in * order to find the maximum drop probability in the chain. */ static double find_leaf_drop_prob(leaf_t *leaf) { identity_t *current = leaf->parent; double result = 0; assert(current); while (current != NULL) { if (current->drop_prob > result) { result = current->drop_prob; } current = current->parent; } return result; } /** * This is called once per estimate interval to enforce the rate that allocate * has decided upon. It makes calls to tc using system(). */ static void enforce(limiter_t *limiter, identity_t *ident) { char cmd[CMD_BUFFER_SIZE]; int ret = 0; int i = 0; switch (limiter->policy) { case POLICY_FPS: /* TC treats limits of 0 (8bit) as unlimited, which causes the * entire rate limiting system to become unpredictable. In * reality, we also don't want any limiter to be able to set its * limit so low that it chokes all of the flows to the point that * they can't increase. Thus, when we're setting a low limit, we * make sure that it isn't too low by using the * FLOW_START_THRESHOLD. */ if (ident->locallimit < FLOW_START_THRESHOLD) { ident->locallimit = FLOW_START_THRESHOLD; } /* Do not allow the node to set a limit higher than its * administratively assigned upper limit (bwcap). */ if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) { ident->locallimit = limiter->nodelimit; } if (system_loglevel == LOG_DEBUG) { printf("FPS: Setting local limit to %d\n", ident->locallimit); } printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id); #if 0 if (printcounter == PRINT_COUNTER_RESET) { if (ident->common.max_flow_rate > 0) { printlog(LOG_WARN, "%d ID:%d %.3f\n", ident->locallimit, ident->id, (double) ident->common.rate / (double) ident->common.max_flow_rate); } else { printlog(LOG_WARN, "%d ID:%d 0\n", ident->locallimit, ident->id); } } This is now done in print_statistics() #endif snprintf(cmd, CMD_BUFFER_SIZE, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600", ident->htb_parent, ident->htb_node, ident->locallimit); if (do_enforcement) { ret = system(cmd); if (ret) { /* FIXME: call failed. What to do? */ printlog(LOG_CRITICAL, "***TC call failed? Call was: %s***\n", cmd); } } break; case POLICY_GRD: for (i = 0; i < ident->leaf_count; ++i) { if (ident->drop_prob >= ident->leaves[i]->drop_prob) { /* The new drop probability for this identity is greater * than or equal to the leaf's current drop probability. * We can safely use the larger value at this leaf * immediately. */ ident->leaves[i]->drop_prob = ident->drop_prob; } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) { /* The old drop probability for this identity is less than * the leaf's current drop probability. This means that * this identity couldn't have been the limiting ident, * so nothing needs to be done because the old limiting * ident is still the limiting factor. */ /* Intentionally blank. */ } else { /* If neither of the above are true, then... * 1) The new drop probability for the identity is less * than what it previously was, and * 2) This ident may have had the maximum drop probability * of all idents limiting this leaf, and therefore we need * to follow the leaf's parents up to the root to find the * new leaf drop probability safely. */ ident->leaves[i]->drop_prob = find_leaf_drop_prob(ident->leaves[i]); } /* Make the call to tc. */ snprintf(cmd, CMD_BUFFER_SIZE, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay %dms", ident->leaves[i]->xid, ident->leaves[i]->xid, (100 * ident->leaves[i]->drop_prob), ident->leaves[i]->delay); if (do_enforcement) { ret = system(cmd); if (ret) { /* FIXME: call failed. What to do? */ printlog(LOG_CRITICAL, "***TC call failed?***\n"); } } } break; default: printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy); break; } return; } /** * This function is periodically called to clean the stable instance's flow * accounting tables for each identity. */ static void clean(drl_instance_t *instance) { identity_t *ident = NULL; map_reset_iterate(instance->ident_map); while ((ident = map_next(instance->ident_map)) != NULL) { pthread_mutex_lock(&ident->table_mutex); ident->table_cleanup_function(ident->table); #ifdef SHADOW_ACCTING standard_table_cleanup((standard_flow_table) ident->shadow_table); #endif pthread_mutex_unlock(&ident->table_mutex); } /* Periodically flush the log file. */ flushlog(); } static void print_averages(drl_instance_t *instance, int print_interval) { identity_t *ident = NULL; map_reset_iterate(instance->ident_map); while ((ident = map_next(instance->ident_map)) != NULL) { ident->avg_bytes /= (double) print_interval; //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval); printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n", ident->avg_bytes / 128, ident->id); //printf("%.3f \t Avg rate. ID:%d\n", // ident->avg_bytes / 128, ident->id); ident->avg_bytes = 0; } } /** Thread function to handle local rate estimation. * * None of our simple hashmap functions are thread safe, so we lock the limiter * with an rwlock to prevent another thread from attempting to modify the set * of identities. * * Each identity also has a private lock for its table. This gets locked by * table-modifying functions such as estimate and clean. It's also locked in * ulogd_DRL.c when the table is being updated with new packets. */ void handle_estimation(void *arg) { limiter_t *limiter = (limiter_t *) arg; int clean_timer, clean_wait_intervals; useconds_t sleep_time = limiter->estintms * 1000; uint32_t cal_slot = 0; int print_interval = 1000 / (limiter->estintms); sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); sigaddset(&signal_mask, SIGUSR2); sigaddset(&signal_mask, SIGRTMAX); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); /* Determine the number of intervals we should wait before hitting the * specified clean interval. (Converts seconds -> intervals). */ clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms); clean_timer = clean_wait_intervals; while (true) { printlog(LOG_DEBUG, "--Beginning new tick.--\n"); /* Sleep according to the delay of the estimate interval. */ usleep(sleep_time); /* Grab the limiter lock for reading. This prevents identities from * disappearing beneath our feet. */ pthread_rwlock_rdlock(&limiter->limiter_lock); cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK; /* Service all the identities that are scheduled to run during this * tick. */ while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) { identity_action *iaction = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot); TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, iaction, calendar); /* Only execute the action if it is valid. */ if (iaction->valid == 0) { free(iaction); continue; } switch (iaction->action) { case ACTION_MAINLOOP: printlog(LOG_DEBUG, "Main loop: identity %d\n", iaction->ident->id); /* Update the ident's flow accouting table with the latest info. */ estimate(iaction->ident, limiter->estintms); /* Determine its share of the rate allocation. */ allocate(limiter, iaction->ident); /* Make tc calls to enforce the rate we decided upon. */ enforce(limiter, iaction->ident); /* Add ident back to the queue at a future time slot. */ TAILQ_INSERT_TAIL(limiter->stable_instance.cal + ((cal_slot + iaction->ident->mainloop_intervals) & SCHEDMASK), iaction, calendar); break; case ACTION_COMMUNICATE: printlog(LOG_DEBUG, "Communicating: identity %d\n", iaction->ident->id); /* Tell the comm library to propagate this identity's result for * this interval.*/ send_update(&iaction->ident->comm, iaction->ident->id); /* Add ident back to the queue at a future time slot. */ TAILQ_INSERT_TAIL(limiter->stable_instance.cal + ((cal_slot + iaction->ident->communication_intervals) & SCHEDMASK), iaction, calendar); break; default: printlog(LOG_CRITICAL, "Unknown identity action!?!\n"); exit(EXIT_FAILURE); } } print_interval--; if (loglevel() == LOG_DEBUG && print_interval <= 0) { print_interval = 1000 / (limiter->estintms); print_averages(&limiter->stable_instance, print_interval); } /* Check if enough intervals have passed for cleaning. */ if (clean_timer <= 0) { clean(&limiter->stable_instance); clean_timer = clean_wait_intervals; } else { clean_timer--; } limiter->stable_instance.cal_slot += 1; pthread_rwlock_unlock(&limiter->limiter_lock); } }