X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Festimate.c;h=29704bee49307796bcb86c2458049405fe1f38f6;hp=f1dd142557caf0519aa15641315b7bac07cd58e1;hb=a3ef11b996352e66f4031c684c7dcda536bc3bf1;hpb=0be9704d6b24d09ebd55beedec52758cb88c570b diff --git a/drl/estimate.c b/drl/estimate.c index f1dd142..29704be 100644 --- a/drl/estimate.c +++ b/drl/estimate.c @@ -7,6 +7,8 @@ * Kevin Webb 2007/2008 */ +#include + /** The size of the buffer we use to hold tc commands. */ #define CMD_BUFFER_SIZE 200 @@ -16,8 +18,12 @@ #include "ratetypes.h" /* needs util and pthread.h */ #include "logging.h" -static int underlimit_flowcount_count = 0; -static int underlimit_normal_count = 0; +#define PRINT_COUNTER_RESET (7) + +extern uint8_t system_loglevel; +static int printcounter = PRINT_COUNTER_RESET - 1; + +uint8_t do_enforcement = 0; /** * Called for each identity each estimate interval. Uses flow table information @@ -153,7 +159,8 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { if (local_rate <= 0) { idealweight = 0; - } else if (dampen_increase == 0 && (ident->locallimit <= 0 || local_rate < ident->locallimit || ident->flowstart)) { + } else if (dampen_increase == 0 && + (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) { /* We're under the limit - all flows are bottlenecked. */ idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights); ideal_over = allocate_fps_over_limit(ident); @@ -163,11 +170,9 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { idealweight = ideal_over; regime = 3; dampen = 2; - underlimit_flowcount_count += 1; } else { regime = 1; dampen = 0; - underlimit_normal_count += 1; } /* Apply EWMA */ @@ -204,7 +209,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { } /* Convert weight into a rate - add in our new local weight */ - total_weight = ident->localweight + peer_weights; + ident->total_weight = total_weight = ident->localweight + peer_weights; /* compute local allocation: if there is traffic elsewhere, use the weights @@ -226,14 +231,23 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over); - printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n", + if (system_loglevel == LOG_DEBUG) { + printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n", local_rate, idealweight, ident->localweight, total_weight); + } + + if (printcounter <= 0) { + printlog(LOG_WARN, "%d %.1f %.1f %.1f %d %d %d %d %d %d ", local_rate, idealweight, + ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k, ftable->num_flows_10k, + ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate); + printcounter = PRINT_COUNTER_RESET; + } else { + printcounter -= 1; + } //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n", // dampen, dampen_increase, peer_weights, regime); - //printf("normal_count: %d, flowcount_count: %d\n", underlimit_normal_count, underlimit_flowcount_count); - if (regime == 3) { printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n", ideal_over, ideal_under); @@ -258,8 +272,10 @@ static double allocate_grd(identity_t *ident, double aggdemand) { dropprob = 0.0; } - //printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n", - // ident->common.rate, aggdemand, dropprob); + if (system_loglevel == LOG_DEBUG) { + printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n", + ident->common.rate, aggdemand, dropprob); + } return dropprob; } @@ -273,7 +289,13 @@ static void allocate(limiter_t *limiter, identity_t *ident) { double comm_val = 0; /* Read comm_val from comm layer. */ - read_comm(&ident->comm, &comm_val); + if (limiter->policy == POLICY_FPS) { + read_comm(&ident->comm, &comm_val, + ident->total_weight / (double) (ident->comm.remote_node_count + 1)); + } else { + read_comm(&ident->comm, &comm_val, + (double) (ident->limit / (double) (ident->comm.remote_node_count + 1))); + } printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val); /* Experimental printing. */ @@ -281,7 +303,7 @@ static void allocate(limiter_t *limiter, identity_t *ident) { (double) ident->common.rate / (double) 128, ident->id); ident->avg_bytes += ident->common.rate; - if (limiter->policynum == POLICY_FPS) { + if (limiter->policy == POLICY_FPS) { ident->locallimit = allocate_fps(ident, comm_val); ident->last_localweight = ident->localweight; @@ -289,8 +311,8 @@ static void allocate(limiter_t *limiter, identity_t *ident) { write_local_value(&ident->comm, ident->localweight); } else { ident->locallimit = 0; /* Unused with GRD. */ - ident->last_localdropprob = ident->localdropprob; - ident->localdropprob = allocate_grd(ident, comm_val); + ident->last_drop_prob = ident->drop_prob; + ident->drop_prob = allocate_grd(ident, comm_val); /* Update other limiters with our rate by writing to comm layer. */ write_local_value(&ident->comm, ident->common.rate); @@ -300,6 +322,26 @@ static void allocate(limiter_t *limiter, identity_t *ident) { ident->common.last_rate = ident->common.rate; } +/** + * Traces all of the parent pointers of a leaf all the way to the root in + * order to find the maximum drop probability in the chain. + */ +static double find_leaf_drop_prob(leaf_t *leaf) { + identity_t *current = leaf->parent; + double result = 0; + + assert(current); + + while (current != NULL) { + if (current->drop_prob > result) { + result = current->drop_prob; + } + current = current->parent; + } + + return result; +} + /** * This is called once per estimate interval to enforce the rate that allocate * has decided upon. It makes calls to tc using system(). @@ -307,8 +349,9 @@ static void allocate(limiter_t *limiter, identity_t *ident) { static void enforce(limiter_t *limiter, identity_t *ident) { char cmd[CMD_BUFFER_SIZE]; int ret = 0; + int i = 0; - switch (limiter->policynum) { + switch (limiter->policy) { case POLICY_FPS: /* TC treats limits of 0 (8bit) as unlimited, which causes the @@ -320,7 +363,7 @@ static void enforce(limiter_t *limiter, identity_t *ident) { * FLOW_START_THRESHOLD. */ if (ident->locallimit < FLOW_START_THRESHOLD) { - ident->locallimit = FLOW_START_THRESHOLD * 2; + ident->locallimit = FLOW_START_THRESHOLD; } /* Do not allow the node to set a limit higher than its @@ -329,38 +372,83 @@ static void enforce(limiter_t *limiter, identity_t *ident) { ident->locallimit = limiter->nodelimit; } - printf("FPS: Setting local limit to %d\n", ident->locallimit); + if (system_loglevel == LOG_DEBUG) { + printf("FPS: Setting local limit to %d\n", ident->locallimit); + } printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id); + if (printcounter == PRINT_COUNTER_RESET) { + printlog(LOG_WARN, "%d\n", ident->locallimit); + } + snprintf(cmd, CMD_BUFFER_SIZE, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600", ident->htb_parent, ident->htb_node, ident->locallimit); - ret = system(cmd); + if (do_enforcement) { + ret = system(cmd); - if (ret) { - /* FIXME: call failed. What to do? */ + if (ret) { + /* FIXME: call failed. What to do? */ + printlog(LOG_CRITICAL, "***TC call failed?***\n"); + } } break; case POLICY_GRD: -/* FIXME: Figure out where to enforce GRD. */ -#if 0 - for (i = 0; i < ident->num_slices; i++){ - - sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms", - ident->xids[i],ident->xids[i], (100*ident->localdropprob)); - - ret = system(cmd); - - if (ret==-1) - print_system_error(ret); - } + for (i = 0; i < ident->leaf_count; ++i) { + if (ident->drop_prob >= ident->leaves[i]->drop_prob) { + /* The new drop probability for this identity is greater + * than or equal to the leaf's current drop probability. + * We can safely use the larger value at this leaf + * immediately. */ + ident->leaves[i]->drop_prob = ident->drop_prob; + } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) { + /* The old drop probability for this identity is less than + * the leaf's current drop probability. This means that + * this identity couldn't have been the limiting ident, + * so nothing needs to be done because the old limiting + * ident is still the limiting factor. */ + + /* Intentionally blank. */ + } else { + /* If neither of the above are true, then... + * 1) The new drop probability for the identity is less + * than what it previously was, and + * 2) This ident may have had the maximum drop probability + * of all idents limiting this leaf, and therefore we need + * to follow the leaf's parents up to the root to find the + * new leaf drop probability safely. */ + ident->leaves[i]->drop_prob = + find_leaf_drop_prob(ident->leaves[i]); + } + + /* Make the call to tc. */ +#ifdef DELAY40MS + snprintf(cmd, CMD_BUFFER_SIZE, + "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms", + ident->leaves[i]->xid, ident->leaves[i]->xid, + (100 * ident->leaves[i]->drop_prob)); +#else + snprintf(cmd, CMD_BUFFER_SIZE, + "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms", + ident->leaves[i]->xid, ident->leaves[i]->xid, + (100 * ident->leaves[i]->drop_prob)); #endif + if (do_enforcement) { + ret = system(cmd); + + if (ret) { + /* FIXME: call failed. What to do? */ + printlog(LOG_CRITICAL, "***TC call failed?***\n"); + } + } + } + break; default: - printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policynum); + printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy); break; } @@ -423,6 +511,7 @@ void handle_estimation(void *arg) { sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); + sigaddset(&signal_mask, SIGUSR1); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); /* Determine the number of intervals we should wait before hitting the