* Kevin Webb 2007/2008
*/
+#include <assert.h>
+
/** The size of the buffer we use to hold tc commands. */
#define CMD_BUFFER_SIZE 200
}
/* Convert weight into a rate - add in our new local weight */
- total_weight = ident->localweight + peer_weights;
+ ident->total_weight = total_weight = ident->localweight + peer_weights;
/* compute local allocation:
if there is traffic elsewhere, use the weights
dropprob = 0.0;
}
- //printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
- // ident->common.rate, aggdemand, dropprob);
+ printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
+ ident->common.rate, aggdemand, dropprob);
return dropprob;
}
double comm_val = 0;
/* Read comm_val from comm layer. */
- read_comm(&ident->comm, &comm_val);
+ if (limiter->policy == POLICY_FPS) {
+ read_comm(&ident->comm, &comm_val,
+ ident->total_weight / (double) (ident->comm.remote_node_count + 1));
+ } else {
+ read_comm(&ident->comm, &comm_val,
+ (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
+ }
printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
/* Experimental printing. */
(double) ident->common.rate / (double) 128, ident->id);
ident->avg_bytes += ident->common.rate;
- if (limiter->policynum == POLICY_FPS) {
+ if (limiter->policy == POLICY_FPS) {
ident->locallimit = allocate_fps(ident, comm_val);
ident->last_localweight = ident->localweight;
write_local_value(&ident->comm, ident->localweight);
} else {
ident->locallimit = 0; /* Unused with GRD. */
- ident->last_localdropprob = ident->localdropprob;
- ident->localdropprob = allocate_grd(ident, comm_val);
+ ident->last_drop_prob = ident->drop_prob;
+ ident->drop_prob = allocate_grd(ident, comm_val);
/* Update other limiters with our rate by writing to comm layer. */
write_local_value(&ident->comm, ident->common.rate);
ident->common.last_rate = ident->common.rate;
}
+/**
+ * Traces all of the parent pointers of a leaf all the way to the root in
+ * order to find the maximum drop probability in the chain.
+ */
+static double find_leaf_drop_prob(leaf_t *leaf) {
+ identity_t *current = leaf->parent;
+ double result = 0;
+
+ assert(current);
+
+ while (current != NULL) {
+ if (current->drop_prob > result) {
+ result = current->drop_prob;
+ }
+ current = current->parent;
+ }
+
+ return result;
+}
+
/**
* This is called once per estimate interval to enforce the rate that allocate
* has decided upon. It makes calls to tc using system().
static void enforce(limiter_t *limiter, identity_t *ident) {
char cmd[CMD_BUFFER_SIZE];
int ret = 0;
+ int i = 0;
- switch (limiter->policynum) {
+ switch (limiter->policy) {
case POLICY_FPS:
/* TC treats limits of 0 (8bit) as unlimited, which causes the
break;
case POLICY_GRD:
-/* FIXME: Figure out where to enforce GRD. */
-#if 0
- for (i = 0; i < ident->num_slices; i++){
-
- sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
- ident->xids[i],ident->xids[i], (100*ident->localdropprob));
-
+ for (i = 0; i < ident->leaf_count; ++i) {
+ if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
+ /* The new drop probability for this identity is greater
+ * than or equal to the leaf's current drop probability.
+ * We can safely use the larger value at this leaf
+ * immediately. */
+ ident->leaves[i]->drop_prob = ident->drop_prob;
+ } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
+ /* The old drop probability for this identity is less than
+ * the leaf's current drop probability. This means that
+ * this identity couldn't have been the limiting ident,
+ * so nothing needs to be done because the old limiting
+ * ident is still the limiting factor. */
+
+ /* Intentionally blank. */
+ } else {
+ /* If neither of the above are true, then...
+ * 1) The new drop probability for the identity is less
+ * than what it previously was, and
+ * 2) This ident may have had the maximum drop probability
+ * of all idents limiting this leaf, and therefore we need
+ * to follow the leaf's parents up to the root to find the
+ * new leaf drop probability safely. */
+ ident->leaves[i]->drop_prob =
+ find_leaf_drop_prob(ident->leaves[i]);
+ }
+
+ /* Make the call to tc. */
+#ifdef DELAY40MS
+ snprintf(cmd, CMD_BUFFER_SIZE,
+ "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
+ ident->leaves[i]->xid, ident->leaves[i]->xid,
+ (100 * ident->leaves[i]->drop_prob));
+#else
+ snprintf(cmd, CMD_BUFFER_SIZE,
+ "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
+ ident->leaves[i]->xid, ident->leaves[i]->xid,
+ (100 * ident->leaves[i]->drop_prob));
+#endif
ret = system(cmd);
- if (ret==-1)
- print_system_error(ret);
+ if (ret) {
+ /* FIXME: call failed. What to do? */
+ }
}
-#endif
+
break;
default:
- printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policynum);
+ printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
break;
}
return 0;
}
+static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
+ int count = 0;
+ identity_t *current_ident;
+ leaf_t *current_leaf;
+ leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
+ if (leaves == NULL) {
+ return 1;
+ }
+
+ map_reset_iterate(instance->leaf_map);
+ while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
+ current_ident = current_leaf->parent;
+ while (current_ident != NULL && current_ident != instance->last_machine) {
+ if (current_ident == ident) {
+ /* Found the ident we were looking for - add the leaf. */
+ leaves[count] = current_leaf;
+ count += 1;
+ break;
+ }
+ current_ident = current_ident->parent;
+ }
+ }
+
+ ident->leaves = leaves;
+ ident->leaf_count = count;
+
+ return 0;
+}
+
static int init_identities(parsed_configs configs, drl_instance_t *instance) {
- int i;
+ int i, j;
ident_config *config = configs.machines;
leaf_t *leaf = NULL;
TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
instance->machines[i], calendar);
+
+ /* Setup the array of pointers to leaves. This is easy for machines
+ * because a machine node applies to every leaf. */
+ instance->machines[i]->leaves =
+ malloc(instance->leaf_count * sizeof(leaf_t *));
+ if (instance->machines[i]->leaves == NULL) {
+ return ENOMEM;
+ }
+ instance->machines[i]->leaf_count = instance->leaf_count;
+ for (j = 0; j < instance->leaf_count; ++j) {
+ instance->machines[i]->leaves[j] = &instance->leaves[j];
+ }
}
/* Connect the set subtree to the machines. Any set or leaf without a
TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
instance->sets[i], calendar);
+
+ /* Setup the array of pointers to leaves. This is harder for sets,
+ * but this doesn't need to be super-efficient because it happens
+ * rarely and it isn't on the critical path for reconfig(). */
+ if (fill_set_leaf_pointer(instance, instance->sets[i])) {
+ return ENOMEM;
+ }
}
/* Success. */
}
printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
+#ifdef DELAY40MS
+ /* Only for artificial delay testing. */
+ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
+ execute_cmd(cmd);
+
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
+ execute_cmd(cmd);
+ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo");
+ execute_cmd(cmd);
+
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms");
+ execute_cmd(cmd);
+ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo");
+ execute_cmd(cmd);
+
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms");
+ execute_cmd(cmd);
+ /* End delay testing */
+#endif
+
+ return 0;
+}
+
+static int setup_tc_grd(drl_instance_t *instance) {
+ int i;
+ char cmd[300];
+
+ for (i = 0; i < instance->leaf_count; ++i) {
+ /* Delete the old pfifo qdisc that might have been there before. */
+ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
+ instance->leaves[i].xid, instance->leaves[i].xid);
+
+ if (execute_cmd(cmd)) {
+ //FIXME: remove this print and do a log.
+ printf("GRD: pfifo qdisc wasn't there!\n");
+ }
+
+ /* Add the netem qdisc. */
+#ifdef DELAY40MS
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms",
+ instance->leaves[i].xid, instance->leaves[i].xid);
+#else
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
+ instance->leaves[i].xid, instance->leaves[i].xid);
+#endif
+
+ if (execute_cmd(cmd)) {
+ return 1;
+ }
+ }
+
+ /* Do the same for 1000 and 1fff. */
+ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
+
+ if (execute_cmd(cmd)) {
+ //FIXME: remove this print and do a log.
+ printf("GRD: pfifo qdisc wasn't there!\n");
+ }
+
+ /* Add the netem qdisc. */
+#ifdef DELAY40MS
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
+#else
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
+#endif
+
+ if (execute_cmd(cmd)) {
+ return 1;
+ }
+
+ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
+
+ if (execute_cmd(cmd)) {
+ //FIXME: remove this print and do a log.
+ printf("GRD: pfifo qdisc wasn't there!\n");
+ }
+
+ /* Add the netem qdisc. */
+#ifdef DELAY40MS
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms");
+#else
+ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
+#endif
+
+ if (execute_cmd(cmd)) {
+ return 1;
+ }
+
return 0;
}
printlog(LOG_WARN, " POLICY: %s\n",policy.u.string);
if (strcasecmp(policy.u.string,"GRD") == 0) {
- limiter.policynum = POLICY_GRD;
+ limiter.policy = POLICY_GRD;
} else if (strcasecmp(policy.u.string,"FPS") == 0) {
- limiter.policynum = POLICY_FPS;
+ limiter.policy = POLICY_FPS;
} else {
printlog(LOG_CRITICAL,
"Unknown DRL policy %s, aborting.\n",policy.u.string);
/* Debugging - FIXME: remove this? */
print_instance(&limiter.stable_instance);
- if (assign_htb_hierarchy(&limiter.stable_instance)) {
- free_instance(&limiter.stable_instance);
- return false;
- }
+ switch (limiter.policy) {
+ case POLICY_FPS:
+ if (assign_htb_hierarchy(&limiter.stable_instance)) {
+ free_instance(&limiter.stable_instance);
+ return false;
+ }
+
+ if (create_htb_hierarchy(&limiter.stable_instance)) {
+ free_instance(&limiter.stable_instance);
+ return false;
+ }
+ break;
- if (create_htb_hierarchy(&limiter.stable_instance)) {
- free_instance(&limiter.stable_instance);
+ case POLICY_GRD:
+ if (setup_tc_grd(&limiter.stable_instance)) {
+ free_instance(&limiter.stable_instance);
+ return false;
+ }
+ break;
+
+ default:
return false;
}
return;
}
- /* Lock */
- pthread_rwlock_wrlock(&limiter.limiter_lock);
-
if (validate_configs(configs, &limiter.new_instance)) {
free_failed_config(configs, &limiter.new_instance);
printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
/* Debugging - FIXME: remove this? */
print_instance(&limiter.new_instance);
+
+ /* Lock */
+ pthread_rwlock_wrlock(&limiter.limiter_lock);
- if (assign_htb_hierarchy(&limiter.new_instance)) {
- free_instance(&limiter.new_instance);
- printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
- pthread_rwlock_unlock(&limiter.limiter_lock);
- return;
- }
+ switch (limiter.policy) {
+ case POLICY_FPS:
+ if (assign_htb_hierarchy(&limiter.new_instance)) {
+ free_instance(&limiter.new_instance);
+ printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
+ pthread_rwlock_unlock(&limiter.limiter_lock);
+ return;
+ }
- if (create_htb_hierarchy(&limiter.new_instance)) {
- free_instance(&limiter.new_instance);
- printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
+ if (create_htb_hierarchy(&limiter.new_instance)) {
+ free_instance(&limiter.new_instance);
+ printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
+
+ /* Re-create old instance. */
+ if (create_htb_hierarchy(&limiter.stable_instance)) {
+ /* Error reinstating the old one - big problem. */
+ printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
+ printlog(LOG_CRITICAL, "Giving up...\n");
+ flushlog();
+ exit(EXIT_FAILURE);
+ }
- /* Re-create old instance. */
- if (create_htb_hierarchy(&limiter.stable_instance)) {
- /* Error reinstating the old one - big problem. */
- printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
- flushlog();
- exit(EXIT_FAILURE);
- }
+ pthread_rwlock_unlock(&limiter.limiter_lock);
+ return;
+ }
+ break;
+
+ case POLICY_GRD:
+ if (setup_tc_grd(&limiter.new_instance)) {
+ free_instance(&limiter.new_instance);
+ printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
+
+ /* Try to re-create old instance. */
+ if (setup_tc_grd(&limiter.stable_instance)) {
+ printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
+ printlog(LOG_CRITICAL, "Giving up...\n");
+ flushlog();
+ exit(EXIT_FAILURE);
+ }
+ }
+ break;
- pthread_rwlock_unlock(&limiter.limiter_lock);
- return;
+ default:
+ /* Should be impossible. */
+ printf("Pigs are flying?\n");
+ exit(EXIT_FAILURE);
}
/* Switch over new to stable instance. */