From: Kevin Webb Date: Sat, 8 Nov 2008 00:35:54 +0000 (+0000) Subject: Reincarnated GRD. Changed mesh decay to go to 1/N rather than 0. X-Git-Tag: DistributedRateLimiting-0.1-0~48 X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=commitdiff_plain;h=19bf89f36a91be2fdd4a0b6c7099f7515507e1e1 Reincarnated GRD. Changed mesh decay to go to 1/N rather than 0. --- diff --git a/drl/drl_state.c b/drl/drl_state.c index 8583d0d..a30337b 100644 --- a/drl/drl_state.c +++ b/drl/drl_state.c @@ -118,7 +118,7 @@ void free_comm(comm_t *comm) { } } -int read_comm(comm_t *comm, double *aggregate) { +int read_comm(comm_t *comm, double *aggregate, double decayto) { remote_limiter_t *remote; pthread_mutex_lock(&comm->lock); @@ -131,9 +131,10 @@ int read_comm(comm_t *comm, double *aggregate) { *aggregate += remote->rate; /* If we continue to read it without having heard an update, - * we start to decay its value. */ + * we start to make the peer's value approach decayto, getting + * half of the way there each time. */ if (remote->awol >= REMOTE_AWOL_THRESHOLD) { - remote->rate = remote->rate / 2; + remote->rate += ((decayto - remote->rate) / 2); } else { remote->awol++; } diff --git a/drl/drl_state.h b/drl/drl_state.h index 2b1ec5e..7715a36 100644 --- a/drl/drl_state.h +++ b/drl/drl_state.h @@ -31,7 +31,7 @@ #define MAX_IDENTS (1024) #define MAX_LIMITERS (128) -#define REMOTE_AWOL_THRESHOLD (3) +#define REMOTE_AWOL_THRESHOLD (5) enum transports { UDP, TCP }; @@ -208,9 +208,13 @@ void free_comm(comm_t *comm); * @param aggregate The location at which the aggregate value will * be stored. * + * @param decayto When using a mesh comm fabric, limiters whose value + * has not been heard in several timesteps will decay to this value. + * Generally globallimit/N. + * * @returns 0 on success, EINVAL on error. */ -int read_comm(comm_t *comm, double *aggregate); +int read_comm(comm_t *comm, double *aggregate, double decayto); /** * Updates the locally observed value of an identity. diff --git a/drl/estimate.c b/drl/estimate.c index f1dd142..ed08447 100644 --- a/drl/estimate.c +++ b/drl/estimate.c @@ -7,6 +7,8 @@ * Kevin Webb 2007/2008 */ +#include + /** The size of the buffer we use to hold tc commands. */ #define CMD_BUFFER_SIZE 200 @@ -204,7 +206,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { } /* Convert weight into a rate - add in our new local weight */ - total_weight = ident->localweight + peer_weights; + ident->total_weight = total_weight = ident->localweight + peer_weights; /* compute local allocation: if there is traffic elsewhere, use the weights @@ -258,8 +260,8 @@ static double allocate_grd(identity_t *ident, double aggdemand) { dropprob = 0.0; } - //printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n", - // ident->common.rate, aggdemand, dropprob); + printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n", + ident->common.rate, aggdemand, dropprob); return dropprob; } @@ -273,7 +275,13 @@ static void allocate(limiter_t *limiter, identity_t *ident) { double comm_val = 0; /* Read comm_val from comm layer. */ - read_comm(&ident->comm, &comm_val); + if (limiter->policy == POLICY_FPS) { + read_comm(&ident->comm, &comm_val, + ident->total_weight / (double) (ident->comm.remote_node_count + 1)); + } else { + read_comm(&ident->comm, &comm_val, + (double) (ident->limit / (double) (ident->comm.remote_node_count + 1))); + } printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val); /* Experimental printing. */ @@ -281,7 +289,7 @@ static void allocate(limiter_t *limiter, identity_t *ident) { (double) ident->common.rate / (double) 128, ident->id); ident->avg_bytes += ident->common.rate; - if (limiter->policynum == POLICY_FPS) { + if (limiter->policy == POLICY_FPS) { ident->locallimit = allocate_fps(ident, comm_val); ident->last_localweight = ident->localweight; @@ -289,8 +297,8 @@ static void allocate(limiter_t *limiter, identity_t *ident) { write_local_value(&ident->comm, ident->localweight); } else { ident->locallimit = 0; /* Unused with GRD. */ - ident->last_localdropprob = ident->localdropprob; - ident->localdropprob = allocate_grd(ident, comm_val); + ident->last_drop_prob = ident->drop_prob; + ident->drop_prob = allocate_grd(ident, comm_val); /* Update other limiters with our rate by writing to comm layer. */ write_local_value(&ident->comm, ident->common.rate); @@ -300,6 +308,26 @@ static void allocate(limiter_t *limiter, identity_t *ident) { ident->common.last_rate = ident->common.rate; } +/** + * Traces all of the parent pointers of a leaf all the way to the root in + * order to find the maximum drop probability in the chain. + */ +static double find_leaf_drop_prob(leaf_t *leaf) { + identity_t *current = leaf->parent; + double result = 0; + + assert(current); + + while (current != NULL) { + if (current->drop_prob > result) { + result = current->drop_prob; + } + current = current->parent; + } + + return result; +} + /** * This is called once per estimate interval to enforce the rate that allocate * has decided upon. It makes calls to tc using system(). @@ -307,8 +335,9 @@ static void allocate(limiter_t *limiter, identity_t *ident) { static void enforce(limiter_t *limiter, identity_t *ident) { char cmd[CMD_BUFFER_SIZE]; int ret = 0; + int i = 0; - switch (limiter->policynum) { + switch (limiter->policy) { case POLICY_FPS: /* TC treats limits of 0 (8bit) as unlimited, which causes the @@ -344,23 +373,56 @@ static void enforce(limiter_t *limiter, identity_t *ident) { break; case POLICY_GRD: -/* FIXME: Figure out where to enforce GRD. */ -#if 0 - for (i = 0; i < ident->num_slices; i++){ - - sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms", - ident->xids[i],ident->xids[i], (100*ident->localdropprob)); - + for (i = 0; i < ident->leaf_count; ++i) { + if (ident->drop_prob >= ident->leaves[i]->drop_prob) { + /* The new drop probability for this identity is greater + * than or equal to the leaf's current drop probability. + * We can safely use the larger value at this leaf + * immediately. */ + ident->leaves[i]->drop_prob = ident->drop_prob; + } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) { + /* The old drop probability for this identity is less than + * the leaf's current drop probability. This means that + * this identity couldn't have been the limiting ident, + * so nothing needs to be done because the old limiting + * ident is still the limiting factor. */ + + /* Intentionally blank. */ + } else { + /* If neither of the above are true, then... + * 1) The new drop probability for the identity is less + * than what it previously was, and + * 2) This ident may have had the maximum drop probability + * of all idents limiting this leaf, and therefore we need + * to follow the leaf's parents up to the root to find the + * new leaf drop probability safely. */ + ident->leaves[i]->drop_prob = + find_leaf_drop_prob(ident->leaves[i]); + } + + /* Make the call to tc. */ +#ifdef DELAY40MS + snprintf(cmd, CMD_BUFFER_SIZE, + "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms", + ident->leaves[i]->xid, ident->leaves[i]->xid, + (100 * ident->leaves[i]->drop_prob)); +#else + snprintf(cmd, CMD_BUFFER_SIZE, + "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms", + ident->leaves[i]->xid, ident->leaves[i]->xid, + (100 * ident->leaves[i]->drop_prob)); +#endif ret = system(cmd); - if (ret==-1) - print_system_error(ret); + if (ret) { + /* FIXME: call failed. What to do? */ + } } -#endif + break; default: - printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policynum); + printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy); break; } diff --git a/drl/ratetypes.h b/drl/ratetypes.h index c88c931..59a481a 100644 --- a/drl/ratetypes.h +++ b/drl/ratetypes.h @@ -41,6 +41,13 @@ typedef struct identity { /** Pointer to the identity's parent in the HTB hierarchy. */ struct identity *parent; + /** Array of the leaves that are limited by this identity. Points into the + * leaves array for the identity's instance. */ + struct leaf **leaves; + + /** The number of leaves for which this identity is responsible. */ + int leaf_count; + /** The fixed (per second) EWMA weight. */ double fixed_ewma_weight; @@ -71,6 +78,8 @@ typedef struct identity { /** FPS previous weight value. */ double last_localweight; + double total_weight; + /** A flag to indicate whether or not the identity is in the flowstart * state. During flowstart, the identity's limit is raised to allow for * flows to grow before incurring losses. */ @@ -79,10 +88,10 @@ typedef struct identity { /* GRD */ /** GRD drop probability information. */ - double localdropprob; + double drop_prob; /** GRD previous drop probability information. */ - double last_localdropprob; + double last_drop_prob; /* Flow accounting machinery. */ @@ -139,6 +148,9 @@ typedef struct leaf { /** The leaf's parent in the hierarchy. This is the identity to which this * leaf belongs. */ identity_t *parent; + + /** GRD: The leaf's packet drop probability. */ + double drop_prob; } leaf_t; /** @@ -194,7 +206,7 @@ typedef struct limiter { uint32_t nodelimit; /** The DRL policy (GRD, FPS) this node is using. */ - enum policies policynum; + enum policies policy; /** The estimate interval (in milliseconds). */ int estintms; diff --git a/drl/ulogd_DRL.c b/drl/ulogd_DRL.c index 5d7ca7e..ef2a17b 100644 --- a/drl/ulogd_DRL.c +++ b/drl/ulogd_DRL.c @@ -766,8 +766,37 @@ static int validate_configs(parsed_configs configs, drl_instance_t *instance) { return 0; } +static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) { + int count = 0; + identity_t *current_ident; + leaf_t *current_leaf; + leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *)); + if (leaves == NULL) { + return 1; + } + + map_reset_iterate(instance->leaf_map); + while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) { + current_ident = current_leaf->parent; + while (current_ident != NULL && current_ident != instance->last_machine) { + if (current_ident == ident) { + /* Found the ident we were looking for - add the leaf. */ + leaves[count] = current_leaf; + count += 1; + break; + } + current_ident = current_ident->parent; + } + } + + ident->leaves = leaves; + ident->leaf_count = count; + + return 0; +} + static int init_identities(parsed_configs configs, drl_instance_t *instance) { - int i; + int i, j; ident_config *config = configs.machines; leaf_t *leaf = NULL; @@ -817,6 +846,18 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), instance->machines[i], calendar); + + /* Setup the array of pointers to leaves. This is easy for machines + * because a machine node applies to every leaf. */ + instance->machines[i]->leaves = + malloc(instance->leaf_count * sizeof(leaf_t *)); + if (instance->machines[i]->leaves == NULL) { + return ENOMEM; + } + instance->machines[i]->leaf_count = instance->leaf_count; + for (j = 0; j < instance->leaf_count; ++j) { + instance->machines[i]->leaves[j] = &instance->leaves[j]; + } } /* Connect the set subtree to the machines. Any set or leaf without a @@ -838,6 +879,13 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), instance->sets[i], calendar); + + /* Setup the array of pointers to leaves. This is harder for sets, + * but this doesn't need to be super-efficient because it happens + * rarely and it isn't on the critical path for reconfig(). */ + if (fill_set_leaf_pointer(instance, instance->sets[i])) { + return ENOMEM; + } } /* Success. */ @@ -1039,6 +1087,94 @@ static int create_htb_hierarchy(drl_instance_t *instance) { } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); +#ifdef DELAY40MS + /* Only for artificial delay testing. */ + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); + execute_cmd(cmd); + + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); + execute_cmd(cmd); + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo"); + execute_cmd(cmd); + + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms"); + execute_cmd(cmd); + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo"); + execute_cmd(cmd); + + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms"); + execute_cmd(cmd); + /* End delay testing */ +#endif + + return 0; +} + +static int setup_tc_grd(drl_instance_t *instance) { + int i; + char cmd[300]; + + for (i = 0; i < instance->leaf_count; ++i) { + /* Delete the old pfifo qdisc that might have been there before. */ + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo", + instance->leaves[i].xid, instance->leaves[i].xid); + + if (execute_cmd(cmd)) { + //FIXME: remove this print and do a log. + printf("GRD: pfifo qdisc wasn't there!\n"); + } + + /* Add the netem qdisc. */ +#ifdef DELAY40MS + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms", + instance->leaves[i].xid, instance->leaves[i].xid); +#else + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms", + instance->leaves[i].xid, instance->leaves[i].xid); +#endif + + if (execute_cmd(cmd)) { + return 1; + } + } + + /* Do the same for 1000 and 1fff. */ + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); + + if (execute_cmd(cmd)) { + //FIXME: remove this print and do a log. + printf("GRD: pfifo qdisc wasn't there!\n"); + } + + /* Add the netem qdisc. */ +#ifdef DELAY40MS + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); +#else + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms"); +#endif + + if (execute_cmd(cmd)) { + return 1; + } + + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo"); + + if (execute_cmd(cmd)) { + //FIXME: remove this print and do a log. + printf("GRD: pfifo qdisc wasn't there!\n"); + } + + /* Add the netem qdisc. */ +#ifdef DELAY40MS + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms"); +#else + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms"); +#endif + + if (execute_cmd(cmd)) { + return 1; + } + return 0; } @@ -1097,9 +1233,9 @@ static int init_drl(void) { printlog(LOG_WARN, " POLICY: %s\n",policy.u.string); if (strcasecmp(policy.u.string,"GRD") == 0) { - limiter.policynum = POLICY_GRD; + limiter.policy = POLICY_GRD; } else if (strcasecmp(policy.u.string,"FPS") == 0) { - limiter.policynum = POLICY_FPS; + limiter.policy = POLICY_FPS; } else { printlog(LOG_CRITICAL, "Unknown DRL policy %s, aborting.\n",policy.u.string); @@ -1155,13 +1291,27 @@ static int init_drl(void) { /* Debugging - FIXME: remove this? */ print_instance(&limiter.stable_instance); - if (assign_htb_hierarchy(&limiter.stable_instance)) { - free_instance(&limiter.stable_instance); - return false; - } + switch (limiter.policy) { + case POLICY_FPS: + if (assign_htb_hierarchy(&limiter.stable_instance)) { + free_instance(&limiter.stable_instance); + return false; + } + + if (create_htb_hierarchy(&limiter.stable_instance)) { + free_instance(&limiter.stable_instance); + return false; + } + break; - if (create_htb_hierarchy(&limiter.stable_instance)) { - free_instance(&limiter.stable_instance); + case POLICY_GRD: + if (setup_tc_grd(&limiter.stable_instance)) { + free_instance(&limiter.stable_instance); + return false; + } + break; + + default: return false; } @@ -1204,9 +1354,6 @@ static void reconfig() { return; } - /* Lock */ - pthread_rwlock_wrlock(&limiter.limiter_lock); - if (validate_configs(configs, &limiter.new_instance)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Validation failed during reconfig().\n"); @@ -1226,28 +1373,56 @@ static void reconfig() { /* Debugging - FIXME: remove this? */ print_instance(&limiter.new_instance); + + /* Lock */ + pthread_rwlock_wrlock(&limiter.limiter_lock); - if (assign_htb_hierarchy(&limiter.new_instance)) { - free_instance(&limiter.new_instance); - printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n"); - pthread_rwlock_unlock(&limiter.limiter_lock); - return; - } + switch (limiter.policy) { + case POLICY_FPS: + if (assign_htb_hierarchy(&limiter.new_instance)) { + free_instance(&limiter.new_instance); + printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n"); + pthread_rwlock_unlock(&limiter.limiter_lock); + return; + } - if (create_htb_hierarchy(&limiter.new_instance)) { - free_instance(&limiter.new_instance); - printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n"); + if (create_htb_hierarchy(&limiter.new_instance)) { + free_instance(&limiter.new_instance); + printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n"); + + /* Re-create old instance. */ + if (create_htb_hierarchy(&limiter.stable_instance)) { + /* Error reinstating the old one - big problem. */ + printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n"); + printlog(LOG_CRITICAL, "Giving up...\n"); + flushlog(); + exit(EXIT_FAILURE); + } - /* Re-create old instance. */ - if (create_htb_hierarchy(&limiter.stable_instance)) { - /* Error reinstating the old one - big problem. */ - printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n"); - flushlog(); - exit(EXIT_FAILURE); - } + pthread_rwlock_unlock(&limiter.limiter_lock); + return; + } + break; + + case POLICY_GRD: + if (setup_tc_grd(&limiter.new_instance)) { + free_instance(&limiter.new_instance); + printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n"); + + /* Try to re-create old instance. */ + if (setup_tc_grd(&limiter.stable_instance)) { + printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n"); + printlog(LOG_CRITICAL, "Giving up...\n"); + flushlog(); + exit(EXIT_FAILURE); + } + } + break; - pthread_rwlock_unlock(&limiter.limiter_lock); - return; + default: + /* Should be impossible. */ + printf("Pigs are flying?\n"); + exit(EXIT_FAILURE); } /* Switch over new to stable instance. */