* Kevin Webb 2007/2008
*/
+#include <assert.h>
+
/** The size of the buffer we use to hold tc commands. */
#define CMD_BUFFER_SIZE 200
#include "ratetypes.h" /* needs util and pthread.h */
#include "logging.h"
-static int underlimit_flowcount_count = 0;
-static int underlimit_normal_count = 0;
+#define PRINT_COUNTER_RESET (7)
+
+extern uint8_t system_loglevel;
+static int printcounter = PRINT_COUNTER_RESET - 1;
+
+uint8_t do_enforcement = 0;
/**
* Called for each identity each estimate interval. Uses flow table information
if (local_rate <= 0) {
idealweight = 0;
- } else if (dampen_increase == 0 && (ident->locallimit <= 0 || local_rate < ident->locallimit || ident->flowstart)) {
+ } else if (dampen_increase == 0 &&
+ (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) {
/* We're under the limit - all flows are bottlenecked. */
idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
ideal_over = allocate_fps_over_limit(ident);
idealweight = ideal_over;
regime = 3;
dampen = 2;
- underlimit_flowcount_count += 1;
} else {
regime = 1;
dampen = 0;
- underlimit_normal_count += 1;
}
/* Apply EWMA */
}
/* Convert weight into a rate - add in our new local weight */
- total_weight = ident->localweight + peer_weights;
+ ident->total_weight = total_weight = ident->localweight + peer_weights;
/* compute local allocation:
if there is traffic elsewhere, use the weights
printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
- printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
+ if (system_loglevel == LOG_DEBUG) {
+ printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
local_rate, idealweight, ident->localweight, total_weight);
+ }
+
+ if (printcounter <= 0) {
+ printlog(LOG_WARN, "%d %.1f %.1f %.1f %d %d %d %d %d %d ", local_rate, idealweight,
+ ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k, ftable->num_flows_10k,
+ ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate);
+ printcounter = PRINT_COUNTER_RESET;
+ } else {
+ printcounter -= 1;
+ }
//printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
// dampen, dampen_increase, peer_weights, regime);
- //printf("normal_count: %d, flowcount_count: %d\n", underlimit_normal_count, underlimit_flowcount_count);
-
if (regime == 3) {
printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
ideal_over, ideal_under);
dropprob = 0.0;
}
- //printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
- // ident->common.rate, aggdemand, dropprob);
+ if (system_loglevel == LOG_DEBUG) {
+ printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
+ ident->common.rate, aggdemand, dropprob);
+ }
return dropprob;
}
double comm_val = 0;
/* Read comm_val from comm layer. */
- read_comm(&ident->comm, &comm_val);
+ if (limiter->policy == POLICY_FPS) {
+ read_comm(&ident->comm, &comm_val,
+ ident->total_weight / (double) (ident->comm.remote_node_count + 1));
+ } else {
+ read_comm(&ident->comm, &comm_val,
+ (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
+ }
printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
/* Experimental printing. */
(double) ident->common.rate / (double) 128, ident->id);
ident->avg_bytes += ident->common.rate;
- if (limiter->policynum == POLICY_FPS) {
+ if (limiter->policy == POLICY_FPS) {
ident->locallimit = allocate_fps(ident, comm_val);
ident->last_localweight = ident->localweight;
write_local_value(&ident->comm, ident->localweight);
} else {
ident->locallimit = 0; /* Unused with GRD. */
- ident->last_localdropprob = ident->localdropprob;
- ident->localdropprob = allocate_grd(ident, comm_val);
+ ident->last_drop_prob = ident->drop_prob;
+ ident->drop_prob = allocate_grd(ident, comm_val);
/* Update other limiters with our rate by writing to comm layer. */
write_local_value(&ident->comm, ident->common.rate);
ident->common.last_rate = ident->common.rate;
}
+/**
+ * Traces all of the parent pointers of a leaf all the way to the root in
+ * order to find the maximum drop probability in the chain.
+ */
+static double find_leaf_drop_prob(leaf_t *leaf) {
+ identity_t *current = leaf->parent;
+ double result = 0;
+
+ assert(current);
+
+ while (current != NULL) {
+ if (current->drop_prob > result) {
+ result = current->drop_prob;
+ }
+ current = current->parent;
+ }
+
+ return result;
+}
+
/**
* This is called once per estimate interval to enforce the rate that allocate
* has decided upon. It makes calls to tc using system().
static void enforce(limiter_t *limiter, identity_t *ident) {
char cmd[CMD_BUFFER_SIZE];
int ret = 0;
+ int i = 0;
- switch (limiter->policynum) {
+ switch (limiter->policy) {
case POLICY_FPS:
/* TC treats limits of 0 (8bit) as unlimited, which causes the
* FLOW_START_THRESHOLD. */
if (ident->locallimit < FLOW_START_THRESHOLD) {
- ident->locallimit = FLOW_START_THRESHOLD * 2;
+ ident->locallimit = FLOW_START_THRESHOLD;
}
/* Do not allow the node to set a limit higher than its
ident->locallimit = limiter->nodelimit;
}
- printf("FPS: Setting local limit to %d\n", ident->locallimit);
+ if (system_loglevel == LOG_DEBUG) {
+ printf("FPS: Setting local limit to %d\n", ident->locallimit);
+ }
printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
+ if (printcounter == PRINT_COUNTER_RESET) {
+ printlog(LOG_WARN, "%d\n", ident->locallimit);
+ }
+
snprintf(cmd, CMD_BUFFER_SIZE,
"/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
ident->htb_parent, ident->htb_node, ident->locallimit);
- ret = system(cmd);
+ if (do_enforcement) {
+ ret = system(cmd);
- if (ret) {
- /* FIXME: call failed. What to do? */
+ if (ret) {
+ /* FIXME: call failed. What to do? */
+ printlog(LOG_CRITICAL, "***TC call failed?***\n");
+ }
}
break;
case POLICY_GRD:
-/* FIXME: Figure out where to enforce GRD. */
-#if 0
- for (i = 0; i < ident->num_slices; i++){
-
- sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
- ident->xids[i],ident->xids[i], (100*ident->localdropprob));
-
- ret = system(cmd);
-
- if (ret==-1)
- print_system_error(ret);
- }
+ for (i = 0; i < ident->leaf_count; ++i) {
+ if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
+ /* The new drop probability for this identity is greater
+ * than or equal to the leaf's current drop probability.
+ * We can safely use the larger value at this leaf
+ * immediately. */
+ ident->leaves[i]->drop_prob = ident->drop_prob;
+ } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
+ /* The old drop probability for this identity is less than
+ * the leaf's current drop probability. This means that
+ * this identity couldn't have been the limiting ident,
+ * so nothing needs to be done because the old limiting
+ * ident is still the limiting factor. */
+
+ /* Intentionally blank. */
+ } else {
+ /* If neither of the above are true, then...
+ * 1) The new drop probability for the identity is less
+ * than what it previously was, and
+ * 2) This ident may have had the maximum drop probability
+ * of all idents limiting this leaf, and therefore we need
+ * to follow the leaf's parents up to the root to find the
+ * new leaf drop probability safely. */
+ ident->leaves[i]->drop_prob =
+ find_leaf_drop_prob(ident->leaves[i]);
+ }
+
+ /* Make the call to tc. */
+#ifdef DELAY40MS
+ snprintf(cmd, CMD_BUFFER_SIZE,
+ "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
+ ident->leaves[i]->xid, ident->leaves[i]->xid,
+ (100 * ident->leaves[i]->drop_prob));
+#else
+ snprintf(cmd, CMD_BUFFER_SIZE,
+ "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
+ ident->leaves[i]->xid, ident->leaves[i]->xid,
+ (100 * ident->leaves[i]->drop_prob));
#endif
+ if (do_enforcement) {
+ ret = system(cmd);
+
+ if (ret) {
+ /* FIXME: call failed. What to do? */
+ printlog(LOG_CRITICAL, "***TC call failed?***\n");
+ }
+ }
+ }
+
break;
default:
- printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policynum);
+ printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
break;
}
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGHUP);
+ sigaddset(&signal_mask, SIGUSR1);
pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
/* Determine the number of intervals we should wait before hitting the