From: Kevin Webb Date: Fri, 29 May 2009 16:56:05 +0000 (+0000) Subject: Long-outstanding commit. (Hopefully) Final version before running experiments for... X-Git-Tag: DistributedRateLimiting-0.1-0~28 X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=commitdiff_plain;h=17fba5cc2628df108d09ab4abbbd4b4a7437e7eb Long-outstanding commit. (Hopefully) Final version before running experiments for the paper. --- diff --git a/drl/config.c b/drl/config.c index 8b15cd2..42f4472 100644 --- a/drl/config.c +++ b/drl/config.c @@ -79,10 +79,13 @@ int get_eligible_leaves(drl_instance_t *instance) { return ENOMEM; } + memset(leaves, 0, count * sizeof(leaf_t)); for (i = 0; i < count; ++i) { leaves[i].xid = atoi(names[i]->d_name); leaves[i].parent = NULL; + leaves[i].drop_prob = 0.0; + leaves[i].delay = 0; free(names[i]); diff --git a/drl/estimate.c b/drl/estimate.c index 21edcbf..3cbf95a 100644 --- a/drl/estimate.c +++ b/drl/estimate.c @@ -38,7 +38,7 @@ static void estimate(identity_t *ident, const double estintms) { time_difference = timeval_subtract(now, ident->common.last_update); - if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) { + if (time_difference > .01 + (estintms / 1000 * ident->mainloop_intervals)) { printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n", estintms * ident->mainloop_intervals, time_difference * 1000); } @@ -125,8 +125,8 @@ static double allocate_fps_over_limit(identity_t *ident) { static inline uint32_t close_enough(uint32_t limit) { uint32_t difference = limit - (limit * CLOSE_ENOUGH); - if (difference < 2500) { - return (limit - 2500); + if (difference < 10240) { + return (limit - 10240); } else { return (limit * CLOSE_ENOUGH); } @@ -219,6 +219,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight, } Old flowstart code. #endif + //printf("rate is %d, close enough is %d, difference is %d\n", table->rate, close_enough(ident->locallimit), close_enough(ident->locallimit) - table->rate); /* Boost low-limits so that they have room to grow. */ if (table->rate < FLOW_START_THRESHOLD) { @@ -564,19 +565,47 @@ static uint32_t allocate_fps_old(identity_t *ident, double total_weight) { */ static double allocate_grd(identity_t *ident, double aggdemand) { double dropprob; - double global_limit = (double) (ident->limit); + double global_limit = ident->limit; + double min_dropprob = ident->drop_prob * GRD_BIG_DROP; + + struct timeval tv; + double time_now; + common_accounting_t *table = &ident->common; + + gettimeofday(&tv, NULL); + time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); if (aggdemand > global_limit) { dropprob = (aggdemand-global_limit)/aggdemand; } else { dropprob = 0.0; } - + + if (dropprob > 0.01 && dropprob < min_dropprob) { + dropprob = min_dropprob; + } + if (system_loglevel == LOG_DEBUG) { printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n", ident->common.rate, aggdemand, dropprob); } + if (table->max_flow_rate > 0) { + printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d %.3f\n", + time_now, table->inst_rate, aggdemand, + table->num_flows, table->num_flows_5k, table->num_flows_10k, + table->num_flows_20k, table->num_flows_50k, table->avg_rate, + table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob, + ident->id, (double) table->rate / (double) table->max_flow_rate); + } else { + printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d 0\n", + time_now, table->inst_rate, aggdemand, + table->num_flows, table->num_flows_5k, table->num_flows_10k, + table->num_flows_20k, table->num_flows_50k, table->avg_rate, + table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob, + ident->id); + } + return dropprob; } @@ -617,7 +646,6 @@ static void allocate(limiter_t *limiter, identity_t *ident) { /* Update other limiters with our weight by writing to comm layer. */ write_local_value(&ident->comm, ident->localweight); } else { - ident->locallimit = 0; /* Unused with GRD. */ ident->last_drop_prob = ident->drop_prob; ident->drop_prob = allocate_grd(ident, comm_val); @@ -739,17 +767,11 @@ static void enforce(limiter_t *limiter, identity_t *ident) { } /* Make the call to tc. */ -#ifdef DELAY40MS snprintf(cmd, CMD_BUFFER_SIZE, - "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms", + "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay %dms", ident->leaves[i]->xid, ident->leaves[i]->xid, - (100 * ident->leaves[i]->drop_prob)); -#else - snprintf(cmd, CMD_BUFFER_SIZE, - "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms", - ident->leaves[i]->xid, ident->leaves[i]->xid, - (100 * ident->leaves[i]->drop_prob)); -#endif + (100 * ident->leaves[i]->drop_prob), ident->leaves[i]->delay); + if (do_enforcement) { ret = system(cmd); diff --git a/drl/peer_comm.c b/drl/peer_comm.c index 2da6737..eba9637 100644 --- a/drl/peer_comm.c +++ b/drl/peer_comm.c @@ -391,7 +391,9 @@ static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) } #endif -#define ALLOW_PARTITION +/* Turn this on to simulate network partitions. + * Turn off for production settings. */ +//#define ALLOW_PARTITION int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { int result = 0; @@ -440,6 +442,8 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { } } + partition_count += 1; + #endif toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ @@ -450,7 +454,6 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); break; } - partition_count += 1; } return result; diff --git a/drl/raterouter.h b/drl/raterouter.h index 2594f12..37f0141 100644 --- a/drl/raterouter.h +++ b/drl/raterouter.h @@ -60,7 +60,9 @@ enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PA */ #define FLOW_START_THRESHOLD (6000) -#define CLOSE_ENOUGH (0.99) +#define CLOSE_ENOUGH (0.90) + +#define GRD_BIG_DROP (0.90) /** * All fields come from the ip protocol header. @@ -79,7 +81,7 @@ enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PA * specified in the config file. The second is a standard table with * "perfect" accounting so that we can compare the two. Turn this off for * any type of production setting. */ -#define SHADOW_ACCTING +//#define SHADOW_ACCTING /* forward declare some structs */ struct limiter; diff --git a/drl/ratetypes.h b/drl/ratetypes.h index dc7f6c9..c4f6b90 100644 --- a/drl/ratetypes.h +++ b/drl/ratetypes.h @@ -188,6 +188,10 @@ typedef struct leaf { /** GRD: The leaf's packet drop probability. */ double drop_prob; + + /** Only used for experimentation. */ + int delay; + } leaf_t; /** diff --git a/drl/samplehold.h b/drl/samplehold.h index 7e51623..fbd8d3d 100644 --- a/drl/samplehold.h +++ b/drl/samplehold.h @@ -31,11 +31,11 @@ #define RANDOM_GRANULARITY (1000) /* For simple S&H testing. */ -//#define SAMPLEHOLD_PERCENTAGE (20) -//#define SAMPLEHOLD_OVERFACTOR (5) - #define SAMPLEHOLD_PERCENTAGE (5) -#define SAMPLEHOLD_OVERFACTOR (10) +#define SAMPLEHOLD_OVERFACTOR (1.25) + +//#define SAMPLEHOLD_PERCENTAGE (5) +//#define SAMPLEHOLD_OVERFACTOR (10) #define SAMPLEHOLD_BONUS_FACTOR (1.05) /** In-table representation of a flow that has been sampled. */ diff --git a/drl/ulogd_DRL.c b/drl/ulogd_DRL.c index 79cb4ef..83e5f13 100644 --- a/drl/ulogd_DRL.c +++ b/drl/ulogd_DRL.c @@ -1269,7 +1269,7 @@ static int create_htb_hierarchy(drl_instance_t *instance) { } static int setup_tc_grd(drl_instance_t *instance) { - int i; + int i, j; char cmd[300]; for (i = 0; i < instance->leaf_count; ++i) { @@ -1282,15 +1282,11 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms", - instance->leaves[i].xid, instance->leaves[i].xid); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms", instance->leaves[i].xid, instance->leaves[i].xid); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } } @@ -1303,13 +1299,10 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms"); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } @@ -1320,16 +1313,65 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms"); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms"); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } + /* Artifical delay or loss for experimentation. */ + if (netem_delay.u.value || netem_loss.u.value) { + if (!strcmp(netem_slice.u.string, "ALL")) { + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value); + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value); + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + + for (j = 0; j < instance->leaf_count; ++j) { + leaf_t *current = &instance->leaves[j]; + + current->delay = netem_delay.u.value; + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value); + + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + } + } else { + uint32_t slice_xid; + leaf_t *leaf = NULL; + + sscanf(netem_slice.u.string, "%x", &slice_xid); + + leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid)); + + if (leaf == NULL) { + /* Leaf not found - invalid selection. */ + printf("Your experimental setup is incorrect...\n"); + return 1; + } + + leaf->delay = netem_delay.u.value; + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value); + + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + } + } + return 0; }