From: Kevin Webb Date: Mon, 30 Mar 2009 19:59:20 +0000 (+0000) Subject: Lots of changes. In no particular order: X-Git-Tag: DistributedRateLimiting-0.1-0~31 X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=commitdiff_plain;h=f83340496f632165030cc92cd98408a87082f6b1 Lots of changes. In no particular order: Added an accounting type called "multipleinterval" that makes decisions based on a window of history. Separated communication from the "system tick". Now identities can specify a separate inverval for accounting a nd communication. Added the ability to do "shadow accounting" when using sample and hold to see what would have happened using reg ular accounting. Cleaned up the FPS allocate code a lot (finally)! Added partition simulation via SIGUSR2 and a partition_set variable in ulogd.conf (mesh only, gossip to come). Added the option of emulating loss/delay (via netem) on a slice for testing. Some fixes to sample&hold and a few other small things. --- diff --git a/drl/Makefile.in b/drl/Makefile.in index 0d1b6ec..0315752 100644 --- a/drl/Makefile.in +++ b/drl/Makefile.in @@ -8,7 +8,7 @@ SH_CFLAGS:=$(CFLAGS) -fPIC # SHARED_LIBS=ulogd_DRL.so -OBJECTS=config.o drl_state.o estimate.o logging.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o +OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o all: $(SHARED_LIBS) diff --git a/drl/calendar.h b/drl/calendar.h index 717c8a2..ecb3b41 100644 --- a/drl/calendar.h +++ b/drl/calendar.h @@ -10,9 +10,8 @@ #define SCHEDLEN (1 << SCHEDBITS) #define SCHEDMASK (SCHEDLEN - 1) -/** Defines a struct ident_calendar whose elements are of type struct identity - * (identity_t) */ -TAILQ_HEAD(ident_calendar, identity); - +/** Defines a struct ident_calendar whose elements are of type struct + * ident_action. */ +TAILQ_HEAD(ident_calendar, ident_action); #endif /* _CALENDAR_H_ */ diff --git a/drl/config.c b/drl/config.c index 1c56ce8..65d8eb9 100644 --- a/drl/config.c +++ b/drl/config.c @@ -105,7 +105,8 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { xmlChar *branch; xmlChar *accounting; xmlChar *ewma; - xmlChar *intervals; + xmlChar *mainloop_intervals; + xmlChar *communication_intervals; xmlNodePtr fields = ident->children; ident_peer *current = NULL; @@ -168,6 +169,8 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { common->accounting = ACT_SAMPLEHOLD; } else if (!xmlStrcmp(accounting, (const xmlChar *) "SIMPLE")) { common->accounting = ACT_SIMPLE; + } else if (!xmlStrcmp(accounting, (const xmlChar *) "MULTIPLEINTERVAL")) { + common->accounting = ACT_MULTIPLE; } else { printlog(LOG_CRITICAL, "Unknown/invalid accounting table.\n"); xmlFree(accounting); @@ -185,13 +188,22 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { xmlFree(ewma); } - intervals = xmlGetProp(ident, (const xmlChar *) "intervals"); - if (intervals == NULL) { - printlog(LOG_CRITICAL, "Ident missing interval count.\n"); - return EINVAL; + mainloop_intervals = xmlGetProp(ident, (const xmlChar *) "loop_intervals"); + if (mainloop_intervals == NULL) { + printlog(LOG_WARN, "Ident id: %d missing loop_intervals, assuming 1.\n", common->id); + common->mainloop_intervals = 1; + } else { + common->mainloop_intervals = atoi((const char *) mainloop_intervals); + xmlFree(mainloop_intervals); + } + + communication_intervals = xmlGetProp(ident, (const xmlChar *) "comm_intervals"); + if (communication_intervals == NULL) { + printlog(LOG_WARN, "Ident id: %d missing comm_intervals, assuming 1.\n", common->id); + common->communication_intervals = 1; } else { - common->intervals = atoi((const char *) intervals); - xmlFree(intervals); + common->communication_intervals = atoi((const char *) communication_intervals); + xmlFree(communication_intervals); } while (fields != NULL) { diff --git a/drl/config.h b/drl/config.h index e1a436f..82e6e1f 100644 --- a/drl/config.h +++ b/drl/config.h @@ -75,9 +75,12 @@ typedef struct ident_config { /** The fixed (1-second) ewma weight value for this identity. */ double fixed_ewma_weight; - /** The number of estimate intervals to wait between calls to estimate, + /** The number of limiter intervals to wait between calls to estimate, * allocate and enforce. */ - int intervals; + int mainloop_intervals; + + /** The number of limiter intervals to wait between communication. */ + int communication_intervals; /** The type of this identity. */ enum ident_types type; diff --git a/drl/drl_state.c b/drl/drl_state.c index fed3326..ea68f51 100644 --- a/drl/drl_state.c +++ b/drl/drl_state.c @@ -215,6 +215,7 @@ void *limiter_receive_thread(void *unused) { sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); + sigaddset(&signal_mask, SIGUSR2); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); diff --git a/drl/estimate.c b/drl/estimate.c index e42d7a6..21edcbf 100644 --- a/drl/estimate.c +++ b/drl/estimate.c @@ -16,12 +16,10 @@ #include "raterouter.h" #include "util.h" #include "ratetypes.h" /* needs util and pthread.h */ +#include "calendar.h" #include "logging.h" -#define PRINT_COUNTER_RESET (0) - extern uint8_t system_loglevel; -static int printcounter = PRINT_COUNTER_RESET - 1; uint8_t do_enforcement = 0; @@ -30,15 +28,30 @@ uint8_t do_enforcement = 0; * to estimate the current aggregate rate and the rate of the individual flows * in the table. */ -static void estimate(identity_t *ident) { +static void estimate(identity_t *ident, const double estintms) { struct timeval now; + double time_difference; + pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */ + gettimeofday(&now, NULL); - pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */ + time_difference = timeval_subtract(now, ident->common.last_update); + + if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) { + printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n", + estintms * ident->mainloop_intervals, time_difference * 1000); + } ident->table_update_function(ident->table, now, ident->ewma_weight); +#ifdef SHADOW_ACCTING + + standard_table_update_flows((standard_flow_table) ident->shadow_table, now, + ident->ewma_weight); + +#endif + pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */ } @@ -46,27 +59,10 @@ static void estimate(identity_t *ident) { * Determines the FPS weight allocation when the identity is under its current * local rate limit. */ -static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) { - uint32_t target = local_rate; +static double allocate_fps_under_limit(identity_t *ident, uint32_t target, double peer_weights) { double ideal_weight; double total_weight = peer_weights + ident->last_localweight; - if (ident->flowstart) { - target = local_rate*4; - if (local_rate >= FLOW_START_THRESHOLD) { - ident->flowstart = false; - } - } - else { - /* June 16, 2008 (KCW) - * ident->flowstart gets set initially to one, but it is never set again. However, - * if a limiter gets flows and then the number of flows drops to zero, it has trouble - * increasing the limit again. */ - if (local_rate < FLOW_START_THRESHOLD) { - ident->flowstart = true; - } - } - if (target >= ident->limit) { ideal_weight = total_weight; } else if (target <= 0) { @@ -101,12 +97,14 @@ static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, d */ static double allocate_fps_over_limit(identity_t *ident) { double ideal_weight; + double total_over_max; if (ident->common.max_flow_rate > 0) { ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate; + total_over_max = (double) ident->common.rate / (double) ident->common.max_flow_rate; - printlog(LOG_DEBUG, "%.3f %d %d %d FlowCount, Limit, MaxRate, TotalRate\n", - ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate); + printlog(LOG_DEBUG, "ideal_over: %.3f, limit: %d, max_flow_rate: %d, total_rate: %d, total/max: %.3f\n", + ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate, total_over_max); } else { ideal_weight = 1; } @@ -114,11 +112,302 @@ static double allocate_fps_over_limit(identity_t *ident) { return ideal_weight; } +/** + * When FPS checks to see which mode it should be operating in + * (over limit vs under limit), we don't want it to actually look to + * see if we're at the limit. Instead, we want to see if we're getting + * close to the limit. This defines how close is "close enough". + * + * For example, if the limit is 50000 and we're sending 49000, we probably + * want to be in the over limit mode, even if we aren't actually over the limit + * in order to switch to the more aggressive weight calculations. + */ +static inline uint32_t close_enough(uint32_t limit) { + uint32_t difference = limit - (limit * CLOSE_ENOUGH); + + if (difference < 2500) { + return (limit - 2500); + } else { + return (limit * CLOSE_ENOUGH); + } +} + +static void print_statistics(identity_t *ident, const double ideal_weight, + const double total_weight, const double localweight, + const char *identifier, common_accounting_t *table, + const uint32_t resulting_limit) { + struct timeval tv; + double time_now; + + gettimeofday(&tv, NULL); + time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); + + printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d %d %s:%d ", + time_now, table->inst_rate, ideal_weight, localweight, total_weight, + table->num_flows, table->num_flows_5k, table->num_flows_10k, + table->num_flows_20k, table->num_flows_50k, table->avg_rate, + table->max_flow_rate, table->max_flow_rate_flow_hash, resulting_limit, + identifier, ident->id); + + if (table->max_flow_rate > 0) { + printlog(LOG_WARN, "%.3f\n", (double) table->rate / (double) table->max_flow_rate); + } else { + printlog(LOG_WARN, "0\n"); + } + + /* Print to the screen in debug mode. */ + if (system_loglevel == LOG_DEBUG) { + printf("Local Rate: %d, Ideal Weight: %.3f, Local Weight: %.3f, Total Weight: %.3f\n", + table->rate, ideal_weight, ident->localweight, total_weight); + } +} + +static uint32_t allocate_fps(identity_t *ident, double total_weight, + common_accounting_t *table, const char *identifier) { + + uint32_t resulting_limit = 0; + double ideal_weight = 0.0; + double peer_weights = total_weight - ident->last_localweight; + + /* Keep track of these for measurements & comparisons only. */ + double ideal_under = 0.0; + double ideal_over = 0.0; + + /* Weight sanity. */ + if (peer_weights < 0.0) { + peer_weights = 0.0; + } + + if (ident->dampen_state == DAMPEN_TEST) { + int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; + double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; + + if (rate_delta > threshold) { + ident->dampen_state = DAMPEN_PASSED; + } else { + ident->dampen_state = DAMPEN_FAILED; + } + } + + /* Rate/weight sanity. */ + if (table->rate <= 0) { + ideal_weight = 0.0; + } + + /* Under the limit OR we failed our dampening test OR our current + * outgoing traffic rate is under the low "flowstart" watermark. */ + else if (ident->dampen_state == DAMPEN_FAILED || + table->rate < close_enough(ident->locallimit)) { +#if 0 + || ident->flowstart) { + uint32_t target_rate = table->rate; + + if (ident->flowstart) { + target_rate *= 4; + + if (table->rate >= FLOW_START_THRESHOLD) { + ident->flowstart = false; + } + } else { + /* June 16, 2008 (KCW) + * ident->flowstart gets set initially to one, but it is never set again. However, + * if a limiter gets flows and then the number of flows drops to zero, it has trouble + * increasing the limit again. */ + if (table->rate < FLOW_START_THRESHOLD) { + ident->flowstart = true; + } + } + Old flowstart code. +#endif + + /* Boost low-limits so that they have room to grow. */ + if (table->rate < FLOW_START_THRESHOLD) { + ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights); + } else { + ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); + } + + ideal_over = allocate_fps_over_limit(ident); + + if (ideal_over < ideal_under) { + /* Degenerate case in which the agressive weight calculation was + * actually less than the under-the-limit case. Use it instead + * and skip the dampening check in the next interval. */ + ideal_weight = ideal_over; + ident->dampen_state = DAMPEN_SKIP; + } else { + ident->dampen_state = DAMPEN_NONE; + } + + /* Apply EWMA. */ + ident->localweight = (ident->localweight * ident->ewma_weight + + ideal_weight * (1 - ident->ewma_weight)); + } + + /* At or over the limit. Use the aggressive weight calculation. */ + else { + double portion_last_interval = 0.0; + double portion_this_interval = 0.0; + + ideal_weight = ideal_over = allocate_fps_over_limit(ident); + ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); + + /* Apply EWMA. */ + ident->localweight = (ident->localweight * ident->ewma_weight + + ideal_weight * (1 - ident->ewma_weight)); + + /* Now check whether the result of the aggressive weight calculation + * increases our portion of the weight "too much", in which case we + * dampen it. */ + + /* Our portion of weight in the whole system during the last interval.*/ + portion_last_interval = ident->last_localweight / total_weight; + + /* Our proposed portion of weight for the current interval. */ + portion_this_interval = ident->localweight / (peer_weights + ident->localweight); + + if (ident->dampen_state == DAMPEN_NONE && + (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) { + ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight); + ident->dampen_state = DAMPEN_TEST; + } else { + ident->dampen_state = DAMPEN_SKIP; + } + } + + /* Add the weight calculated in this interval to the total. */ + ident->total_weight = total_weight = ident->localweight + peer_weights; + + /* Convert weight value into a rate limit. If there is no measureable + * weight, do a L/n allocation. */ + if (total_weight > 0) { + resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight); + } else { + resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); + } + + print_statistics(ident, ideal_weight, total_weight, ident->localweight, + identifier, table, resulting_limit); + + return resulting_limit; +} + +#ifdef SHADOW_ACCTING + +/* Runs through the allocate functionality without making any state changes to + * the identity. Useful for comparisons, especially for comparing standard + * and sample&hold accounting schemes. */ +static void allocate_fps_pretend(identity_t *ident, double total_weight, + common_accounting_t *table, const char *identifier) { + + uint32_t resulting_limit = 0; + double ideal_weight = 0.0; + double peer_weights = total_weight - ident->last_localweight_copy; + double ideal_under = 0.0; + double ideal_over = 0.0; + + if (peer_weights < 0.0) { + peer_weights = 0.0; + } + + if (ident->dampen_state_copy == DAMPEN_TEST) { + int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; + double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; + + if (rate_delta > threshold) { + ident->dampen_state_copy = DAMPEN_PASSED; + } else { + ident->dampen_state_copy = DAMPEN_FAILED; + } + } + + /* Rate/weight sanity. */ + if (table->rate <= 0) { + ideal_weight = 0.0; + } + + /* Under the limit OR we failed our dampening test OR our current + * outgoing traffic rate is under the low "flowstart" watermark. */ + else if (ident->dampen_state_copy == DAMPEN_FAILED || + table->rate < close_enough(ident->locallimit)) { + + /* Boost low-limits so that they have room to grow. */ + if (table->rate < FLOW_START_THRESHOLD) { + ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights); + } else { + ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); + } + + ideal_over = allocate_fps_over_limit(ident); + + if (ideal_over < ideal_under) { + /* Degenerate case in which the agressive weight calculation was + * actually less than the under-the-limit case. Use it instead + * and skip the dampening check in the next interval. */ + ideal_weight = ideal_over; + ident->dampen_state_copy = DAMPEN_SKIP; + } else { + ident->dampen_state_copy = DAMPEN_NONE; + } + + /* Apply EWMA. */ + ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight + + ideal_weight * (1 - ident->ewma_weight)); + } + + /* At or over the limit. Use the aggressive weight calculation. */ + else { + double portion_last_interval = 0.0; + double portion_this_interval = 0.0; + + ideal_weight = ideal_over = allocate_fps_over_limit(ident); + ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights); + + /* Apply EWMA. */ + ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight + + ideal_weight * (1 - ident->ewma_weight)); + + /* Now check whether the result of the aggressive weight calculation + * increases our portion of the weight "too much", in which case we + * dampen it. */ + + /* Our portion of weight in the whole system during the last interval.*/ + portion_last_interval = ident->last_localweight / total_weight; + + /* Our proposed portion of weight for the current interval. */ + portion_this_interval = ident->localweight_copy / (peer_weights + ident->localweight_copy); + + if (ident->dampen_state_copy == DAMPEN_NONE && + (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) { + ident->localweight_copy = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight); + ident->dampen_state_copy = DAMPEN_TEST; + } else { + ident->dampen_state_copy = DAMPEN_SKIP; + } + } + + /* Add the weight calculated in this interval to the total. */ + total_weight = ident->localweight_copy + peer_weights; + + /* Convert weight value into a rate limit. If there is no measureable + * weight, do a L/n allocation. */ + if (total_weight > 0) { + resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight); + } else { + resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); + } + + print_statistics(ident, ideal_weight, total_weight, ident->localweight_copy, + identifier, table, resulting_limit); +} + +#endif + /** * Determines the amount of FPS weight to allocate to the identity during each * estimate interval. Note that total_weight includes local weight. */ -static uint32_t allocate_fps(identity_t *ident, double total_weight) { +static uint32_t allocate_fps_old(identity_t *ident, double total_weight) { common_accounting_t *ftable = &ident->common; /* Common flow table info */ uint32_t local_rate = ftable->rate; uint32_t ideallocal = 0; @@ -160,7 +449,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { if (local_rate <= 0) { idealweight = 0; } else if (dampen_increase == 0 && - (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) { + (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) { /* We're under the limit - all flows are bottlenecked. */ idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights); ideal_over = allocate_fps_over_limit(ident); @@ -236,6 +525,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { local_rate, idealweight, ident->localweight, total_weight); } +#if 0 if (printcounter <= 0) { struct timeval tv; double time_now; @@ -260,6 +550,8 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) { printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n", ideal_over, ideal_under); } + See print_statistics() +#endif printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal); @@ -312,9 +604,16 @@ static void allocate(limiter_t *limiter, identity_t *ident) { ident->avg_bytes += ident->common.rate; if (limiter->policy == POLICY_FPS) { - ident->locallimit = allocate_fps(ident, comm_val); +#ifdef SHADOW_ACCTING + + allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID"); + + ident->last_localweight_copy = ident->localweight_copy; +#endif + + ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID"); ident->last_localweight = ident->localweight; - + /* Update other limiters with our weight by writing to comm layer. */ write_local_value(&ident->comm, ident->localweight); } else { @@ -385,9 +684,17 @@ static void enforce(limiter_t *limiter, identity_t *ident) { } printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id); +#if 0 if (printcounter == PRINT_COUNTER_RESET) { - printlog(LOG_WARN, "%d\n", ident->locallimit); + if (ident->common.max_flow_rate > 0) { + printlog(LOG_WARN, "%d ID:%d %.3f\n", ident->locallimit, ident->id, + (double) ident->common.rate / (double) ident->common.max_flow_rate); + } else { + printlog(LOG_WARN, "%d ID:%d 0\n", ident->locallimit, ident->id); + } } + This is now done in print_statistics() +#endif snprintf(cmd, CMD_BUFFER_SIZE, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600", @@ -398,7 +705,7 @@ static void enforce(limiter_t *limiter, identity_t *ident) { if (ret) { /* FIXME: call failed. What to do? */ - printlog(LOG_CRITICAL, "***TC call failed?***\n"); + printlog(LOG_CRITICAL, "***TC call failed? Call was: %s***\n", cmd); } } break; @@ -476,6 +783,12 @@ static void clean(drl_instance_t *instance) { ident->table_cleanup_function(ident->table); +#ifdef SHADOW_ACCTING + + standard_table_cleanup((standard_flow_table) ident->shadow_table); + +#endif + pthread_mutex_unlock(&ident->table_mutex); } @@ -505,11 +818,11 @@ static void print_averages(drl_instance_t *instance, int print_interval) { * of identities. * * Each identity also has a private lock for its table. This gets locked by - * table-modifying functions such as estimate and clean. + * table-modifying functions such as estimate and clean. It's also locked in + * ulogd_DRL.c when the table is being updated with new packets. */ void handle_estimation(void *arg) { limiter_t *limiter = (limiter_t *) arg; - identity_t *ident = NULL; int clean_timer, clean_wait_intervals; useconds_t sleep_time = limiter->estintms * 1000; uint32_t cal_slot = 0; @@ -520,6 +833,7 @@ void handle_estimation(void *arg) { sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); + sigaddset(&signal_mask, SIGUSR2); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); /* Determine the number of intervals we should wait before hitting the @@ -528,6 +842,8 @@ void handle_estimation(void *arg) { clean_timer = clean_wait_intervals; while (true) { + printlog(LOG_DEBUG, "--Beginning new tick.--\n"); + /* Sleep according to the delay of the estimate interval. */ usleep(sleep_time); @@ -540,26 +856,53 @@ void handle_estimation(void *arg) { /* Service all the identities that are scheduled to run during this * tick. */ while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) { - ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot); - TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar); + identity_action *iaction = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot); + TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, iaction, calendar); - /* Update the ident's flow accouting table with the latest info. */ - estimate(ident); + /* Only execute the action if it is valid. */ + if (iaction->valid == 0) { + free(iaction); + continue; + } + + switch (iaction->action) { + case ACTION_MAINLOOP: + + printlog(LOG_DEBUG, "Main loop: identity %d\n", iaction->ident->id); + + /* Update the ident's flow accouting table with the latest info. */ + estimate(iaction->ident, limiter->estintms); + + /* Determine its share of the rate allocation. */ + allocate(limiter, iaction->ident); - /* Determine its share of the rate allocation. */ - allocate(limiter, ident); + /* Make tc calls to enforce the rate we decided upon. */ + enforce(limiter, iaction->ident); - /* Make tc calls to enforce the rate we decided upon. */ - enforce(limiter, ident); + /* Add ident back to the queue at a future time slot. */ + TAILQ_INSERT_TAIL(limiter->stable_instance.cal + + ((cal_slot + iaction->ident->mainloop_intervals) & SCHEDMASK), + iaction, calendar); + break; - /* Tell the comm library to propagate this identity's result for - * this interval.*/ - send_update(&ident->comm, ident->id); + case ACTION_COMMUNICATE: - /* Add ident back to the queue at a future time slot. */ - TAILQ_INSERT_TAIL(limiter->stable_instance.cal + - ((cal_slot + ident->intervals) & SCHEDMASK), - ident, calendar); + printlog(LOG_DEBUG, "Communicating: identity %d\n", iaction->ident->id); + + /* Tell the comm library to propagate this identity's result for + * this interval.*/ + send_update(&iaction->ident->comm, iaction->ident->id); + + /* Add ident back to the queue at a future time slot. */ + TAILQ_INSERT_TAIL(limiter->stable_instance.cal + + ((cal_slot + iaction->ident->communication_intervals) & SCHEDMASK), + iaction, calendar); + break; + + default: + printlog(LOG_CRITICAL, "Unknown identity action!?!\n"); + exit(EXIT_FAILURE); + } } print_interval--; diff --git a/drl/multipleinterval.c b/drl/multipleinterval.c new file mode 100644 index 0000000..131d7f9 --- /dev/null +++ b/drl/multipleinterval.c @@ -0,0 +1,458 @@ +/* See the DRL-LICENSE file for this file's software license. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common_accounting.h" +#include "multipleinterval.h" +#include "logging.h" + +multiple_flow_table multiple_table_create(uint32_t (*hash_function)(const key_flow *key), uint32_t interval_count, common_accounting_t *common) { + int i; + multiple_flow_table table = malloc(sizeof(struct mul_flow_table)); + + if (table == NULL) { + return NULL; + } + + memset(table, 0, sizeof(struct mul_flow_table)); + table->common = common; + table->hash_function = hash_function; + table->interval_count = interval_count; + + gettimeofday(&table->common->last_update, NULL); + + table->intervals = malloc(interval_count * sizeof(interval)); + + if (table->intervals == NULL) { + free(table); + return NULL; + } + + memset(table->intervals, 0, interval_count * sizeof(interval)); + table->intervals[0].valid = 1; + table->intervals[0].last_update = table->common->last_update; + + for (i = 0; i < interval_count; ++i) { + table->intervals[i].next = &table->intervals[(i + 1) % interval_count]; + } + + table->current_interval = &table->intervals[0]; + + return table; +} + +void multiple_table_destroy(multiple_flow_table table) { + multiple_flow *current, *next; + + if ((current = table->flows_head)) { + while (current->next) { + next = current->next; + free(current->intervals); + free(current); + current = next; + } + free(current->intervals); + free(current); + } + + free(table->intervals); + free(table); +} + +/* Looks for the flow in the table. If the flow isn't there, it allocates a + * place for it. */ +multiple_flow *multiple_table_lookup(multiple_flow_table table, const key_flow *key) { + uint32_t hash; + multiple_flow *flow; + struct in_addr src, dst; + char sip[22], dip[22]; + int i; + + if (table == NULL) { + return NULL; + } + + hash = table->hash_function(key); + + /* Find the flow, if it's there. */ + for (flow = table->flows[hash]; flow; flow = flow->nexth) { + if (flow->source_ip == key->source_ip && + flow->dest_ip == key->dest_ip && + flow->source_port == key->source_port && + flow->dest_port == key->dest_port && + flow->protocol == key->protocol) { + break; + } + } + + if (flow == NULL) { + flow = malloc(sizeof(multiple_flow)); + if (flow == NULL) { + printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n"); + return NULL; + } + memset(flow, 0, sizeof(multiple_flow)); + + flow->intervals = malloc(table->interval_count * sizeof(interval)); + if (flow->intervals == NULL) { + free(flow); + printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n"); + return NULL; + } + memset(flow->intervals, 0, table->interval_count * sizeof(interval)); + + flow->protocol = key->protocol; + flow->source_ip = key->source_ip; + flow->dest_ip = key->dest_ip; + flow->source_port = key->source_port; + flow->dest_port = key->dest_port; + + flow->intervals[0].last_packet = key->packet_time; + flow->intervals[0].last_update = table->common->last_update; + flow->intervals[0].valid = 1; + + for (i = 0; i < table->interval_count; ++i) { + flow->intervals[i].next = &flow->intervals[(i + 1) % table->interval_count]; + } + + flow->current_interval = &flow->intervals[0]; + + /* Add the flow to the hash list. */ + flow->nexth = table->flows[hash]; + table->flows[hash] = flow; + + /* Add the flow to the linked list. */ + if (table->flows_tail) { + flow->prev = table->flows_tail; + table->flows_tail->next = flow; + table->flows_tail = flow; + } else { + table->flows_head = table->flows_tail = flow; + /* next and prev are already null due to memset above. */ + } + + src.s_addr = ntohl(flow->source_ip); + dst.s_addr = ntohl(flow->dest_ip); + strcpy(sip, inet_ntoa(src)); + strcpy(dip, inet_ntoa(dst)); + printlog(LOG_DEBUG, "ALLOC:%s:%hu -> %s:%hu\n", sip, + flow->source_port, dip, flow->dest_port); + } + + return flow; +} + +int multiple_table_sample(multiple_flow_table table, const key_flow *key) { + multiple_flow *flow; + + assert(table != NULL); + assert(table->common != NULL); + + /* Update aggregate. */ + //table->common->bytes_since += key->packet_size; + table->current_interval->bytes_since += key->packet_size; + table->current_interval->valid = 1; + + /* Update flow. */ + flow = multiple_table_lookup(table, key); + if (flow == NULL) { + return 0; + } + + /* Update flow's last packet info so that we know when to delete. */ + flow->last_packet = key->packet_time; + + /* Update interval information. */ + flow->current_interval->bytes_since += key->packet_size; + flow->current_interval->last_packet = key->packet_time; + flow->current_interval->valid = 1; + + return 1; +} + +void multiple_table_remove(multiple_flow_table table, multiple_flow *flow) { + key_flow key; + uint32_t hash; + + assert(flow); + + /* Remove the flow from the hash list. */ + key.source_ip = flow->source_ip; + key.dest_ip = flow->dest_ip; + key.source_port = flow->source_port; + key.dest_port = flow->dest_port; + key.protocol = flow->protocol; + + hash = table->hash_function(&key); + + assert(table->flows[hash]); + + if (table->flows[hash] == flow) { + /* It's the head of the hash list. */ + table->flows[hash] = flow->nexth; + } else { + multiple_flow *current, *prev; + + prev = table->flows[hash]; + + for (current = table->flows[hash]->nexth; current; current = current->nexth) { + if (current == flow) { + prev->nexth = flow->nexth; + break; + } else { + prev = current; + } + } + + if (current == NULL) { + printlog(LOG_CRITICAL, "Flow %p disappeared?\n", flow); + } + assert(current != NULL); + } + + /* Remove the flow from the linked list. */ + if (flow->prev == NULL && flow->next == NULL) { + /* It's the head, tail, and only element of the list. */ + assert(table->flows_head == flow); + assert(table->flows_tail == flow); + + table->flows_head = NULL; + table->flows_tail = NULL; + } else if (flow->prev == NULL) { + /* It's the head of the list. */ + assert(table->flows_head == flow); + + table->flows_head = flow->next; + + if (table->flows_head != NULL) { + table->flows_head->prev = NULL; + } + } else if (flow->next == NULL) { + /* It's the tail of the list. */ + assert(table->flows_tail == flow); + + table->flows_tail = flow->prev; + + table->flows_tail->next = NULL; + } else { + /* Not the head or tail of the list. */ + assert(table->flows_head != flow); + + flow->prev->next = flow->next; + + if (flow->next != NULL) { + flow->next->prev = flow->prev; + } + } + + /* Free the interval info. */ + memset(flow->intervals, 0, table->interval_count * sizeof(interval)); + free(flow->intervals); + + /* Free the flow. */ + memset(flow, 0, sizeof(multiple_flow)); + free(flow); +} + +int multiple_table_cleanup(multiple_flow_table table) { + multiple_flow *current = table->flows_head; + multiple_flow *remove; + time_t now = time(NULL); + + while (current != NULL) { + if (current->last_packet + MUL_FLOW_IDLE_TIME <= now) { + /* Flow hasn't received a packet in the time limit - kill it. */ + remove = current; + current = current->next; + + multiple_table_remove(table, remove); + } else { + current = current->next; + } + } + + return 0; +} + +static interval *get_oldest_interval(interval *newest) { + interval *candidate = newest; + interval *oldest = NULL; + + while (oldest == NULL) { + candidate = candidate->next; + + if (candidate == newest) { + oldest = newest; + } else if (candidate->valid) { + oldest = candidate; + } + } + + return oldest; +} + +static uint32_t get_bytes_over_interval(interval *newest, interval *oldest) { + uint32_t result = newest->bytes_since; + interval *current = oldest; + + while (current != newest) { + result += current->bytes_since; + current = current->next; + } + + return result; +} + +void multiple_table_update_flows(multiple_flow_table table, struct timeval now, double ewma_weight) { + uint32_t maxflowrate = 0; + double time_delta; + double unweighted_rate; + multiple_flow *current; + struct in_addr src, dst; + char sip[22], dip[22]; + key_flow largest_flow_info; + + /* Table interval variables. */ + interval *table_newest = NULL; + interval *table_oldest = NULL; + uint32_t table_bytes_over_intervals = 0; + + /* Reset statistics. */ + table->common->num_flows = 0; + table->common->num_flows_5k = 0; + table->common->num_flows_10k = 0; + table->common->num_flows_20k = 0; + table->common->num_flows_50k = 0; + table->common->avg_rate = 0; + /* End statistics. */ + + table_newest = table->current_interval; + table_oldest = get_oldest_interval(table_newest); + + table_bytes_over_intervals = get_bytes_over_interval(table_newest, table_oldest); + + time_delta = timeval_subtract(now, table_oldest->last_update); + + if (time_delta <= 0) { + unweighted_rate = 0; + } else { + unweighted_rate = table_bytes_over_intervals / time_delta; + } + + table->common->last_inst_rate = table->common->inst_rate; + table->common->inst_rate = unweighted_rate; + printf("Unweighted rate is: %.3f, computed from %d bytes in %f seconds\n", unweighted_rate, table_bytes_over_intervals, time_delta); + + table->common->last_rate = table->common->rate; + + /* If the rate is zero, then we don't know anything yet. Don't apply EWMA + * in that case. */ + if (table->common->rate == 0) { + table->common->rate = unweighted_rate; + } else { + //FIXME: Continue to use ewma here? + table->common->rate = table->common->rate * ewma_weight + unweighted_rate * (1 - ewma_weight); + } + + table->common->last_update = now; + table->current_interval = table->current_interval->next; + table->current_interval->last_update = now; + table->current_interval->bytes_since = 0; + table->current_interval->valid = 1; + + /* Update per-flow information. */ + for (current = table->flows_head; current; current = current->next) { + interval *newest = current->current_interval; + interval *oldest = get_oldest_interval(newest); + uint32_t bytes_over_intervals = 0; + + /* This flow is invalid - don't consider it further. */ + if (newest->valid == 0) { + printlog(LOG_WARN, "Found invalid flow in table.\n"); + continue; + } + + time_delta = timeval_subtract(now, oldest->last_update); + bytes_over_intervals = get_bytes_over_interval(newest, oldest); + + if (time_delta <= 0) { + unweighted_rate = 0; + } else { + unweighted_rate = bytes_over_intervals / time_delta; + } + + current->last_rate = current->rate; + + if (current->rate == 0) { + current->rate = unweighted_rate; + } else { + //FIXME: Continue to use ewma here? + current->rate = current->rate * ewma_weight + unweighted_rate * (1 - ewma_weight); + } + + /* Update the accounting info for intervals. */ + current->current_interval = current->current_interval->next; + current->current_interval->last_update = now; + current->current_interval->bytes_since = 0; + current->current_interval->valid = 1; + + if (current->rate > maxflowrate) { + maxflowrate = current->rate; + largest_flow_info.source_ip = current->source_ip; + largest_flow_info.dest_ip = current->dest_ip; + largest_flow_info.source_port = current->source_port; + largest_flow_info.dest_port = current->dest_port; + largest_flow_info.protocol = current->protocol; + } + + if (current->rate > 51200) { + table->common->num_flows_50k += 1; + table->common->num_flows_20k += 1; + table->common->num_flows_10k += 1; + table->common->num_flows_5k += 1; + table->common->num_flows += 1; + } else if (current->rate > 20480) { + table->common->num_flows_20k += 1; + table->common->num_flows_10k += 1; + table->common->num_flows_5k += 1; + table->common->num_flows += 1; + } else if (current->rate > 10240) { + table->common->num_flows_10k += 1; + table->common->num_flows_5k += 1; + table->common->num_flows += 1; + } else if (current->rate > 5120) { + table->common->num_flows_5k += 1; + table->common->num_flows += 1; + } else { + table->common->num_flows += 1; + } + + src.s_addr = ntohl(current->source_ip); + dst.s_addr = ntohl(current->dest_ip); + strcpy(sip, inet_ntoa(src)); + strcpy(dip, inet_ntoa(dst)); + printlog(LOG_DEBUG, "FLOW: (%p) %s:%d -> %s:%d at %d\n", current, + sip, current->source_port, + dip, current->dest_port, + current->rate); + } + + if (table->common->num_flows > 0) { + table->common->avg_rate = table->common->rate / table->common->num_flows; + } + + printlog(LOG_DEBUG, "FLOW:--\n--\n"); + + table->common->max_flow_rate = maxflowrate; + table->common->max_flow_rate_flow_hash = table->hash_function(&largest_flow_info); +} diff --git a/drl/multipleinterval.h b/drl/multipleinterval.h new file mode 100644 index 0000000..3fc007e --- /dev/null +++ b/drl/multipleinterval.h @@ -0,0 +1,174 @@ +/* See the DRL-LICENSE file for this file's software license. */ + +/* + * Defines the multiple-interval "perfect" flow accounting table. + * + */ + +#ifndef _MULTIPLE_ACCOUNTING_H_ +#define _MULTIPLE_ACCOUNTING_H_ + +#include + +//FIXME: Update comments on structure variables + +/** The number of hash buckets in the table. */ +#define MUL_FLOW_HASH_SIZE 1024 + +/** The number of seconds after which a flow is considered to be inactive. + * Inactive flows will be removed from the table during the next call to the + * cleanup function. */ +#define MUL_FLOW_IDLE_TIME 15 + +#define MUL_INTERVAL_COUNT 10 + +typedef struct mul_interval { + /** The number of bytes this flow has sent since the last update to this + * interval structure. */ + uint32_t bytes_since; + + /** The time at which this interval was last updated. */ + struct timeval last_update; + + /** The time at which the most recent packet in this flow was received + * in this interval. */ + time_t last_packet; + + uint32_t valid; + + struct mul_interval *next; +} interval; + +/** Representation of a flow in a multiple-interval table. */ +typedef struct mul_flow { + /* Flow information. */ + + /** The rate of the flow in the current estimate interval. */ + uint32_t rate; + + /** The rate of the flow in the previous estimate interval. */ + uint32_t last_rate; + + time_t last_packet; + + interval *current_interval; + + interval *intervals; + + /* Identification information. */ + + /** The flow's source IP address. */ + uint32_t source_ip; + + /** The flow's destination IP address. */ + uint32_t dest_ip; + + /** The flow's source port. */ + uint16_t source_port; + + /** The flow's destination port. */ + uint16_t dest_port; + + /** The flow's protocol. This corresponds to the protocol field of the IP + * header. */ + uint8_t protocol; + + /* Table state. */ + + /** Pointer to the next flow in the hash list. */ + struct mul_flow *nexth; + + /** Pointers to the next flow in the linked list. */ + struct mul_flow *next; + + /** Pointers to the previous flow in the linked list. */ + struct mul_flow *prev; + +} multiple_flow; + +/** + * The "table" that stores the flows. It's constructed of two main pieces. + * + * The first is an array of hash buckets. Flows are classified into buckets + * by hashing the flow's values (key flow) using the table's hash function. + * Flows are chained together as a list in each bucket to deal with collisions. + * + * The second is a simple doubly linked list containing every flow in the table. + */ +struct mul_flow_table { + + /** Pointer to the common flow information for the identity that owns this + * sampled flow table. This is updated with aggregate information. */ + common_accounting_t *common; + + uint32_t interval_count; + + interval *current_interval; + + interval *intervals; + + /* Table structures. */ + + /** Hash buckets - each is a list of flows. */ + struct mul_flow *flows[MUL_FLOW_HASH_SIZE]; + + /** The head of the linked list of flows. */ + struct mul_flow *flows_head; + + /** The tail of the linked list of flows. */ + struct mul_flow *flows_tail; + + /** Function pointer to the function that will yield the hash of a + * key_flow. + */ + uint32_t (*hash_function)(const key_flow *key); + +}; + +/** The type multiple_flow_table is really a pointer to a struct + * mul_flow_table. */ +typedef struct mul_flow_table *multiple_flow_table; + +/** + * Creates a new table that will use the specified hash function. + * + * Returns the new table or NULL on failure. + */ +multiple_flow_table multiple_table_create(uint32_t (*hash_function)(const key_flow *key), uint32_t interval_count, common_accounting_t *common); + +/** + * Destroys the specified table. + */ +void multiple_table_destroy(multiple_flow_table table); + +/** + * Looks for a flow that matches the given key in the specified table. If a + * matching flow is not found, this function will allocate a new flow for the + * key. + * + * Returns a pointer to the flow that matches the key or NULL if there is no + * matching flow and a new flow couldn't be allocated. + */ +multiple_flow *multiple_table_lookup(multiple_flow_table table, const key_flow *key); + +/** + * Updates the state of the table given a newly acquired packet. + * + * Returns 1 if the flow is in the table. 0 otherwise (indicating that memory + * could not be allocated to add a new flow to the table for the given key. + */ +int multiple_table_sample(multiple_flow_table table, const key_flow *key); + +/** + * Cleans the table by removing flow entires for any flow that hasn't seen a + * new packet in the interval specified by MUL_FLOW_IDLE_TIME seconds. + */ +int multiple_table_cleanup(multiple_flow_table table); + +/** + * Updates the rate information for all flows in the table according to the + * specified current time and EWMA weight. + */ +void multiple_table_update_flows(multiple_flow_table table, struct timeval now, double ewma_weight); + +#endif /* _MULTIPLE_ACCOUNTING_H_ */ diff --git a/drl/peer_comm.c b/drl/peer_comm.c index 869cde9..41ed778 100644 --- a/drl/peer_comm.c +++ b/drl/peer_comm.c @@ -42,6 +42,10 @@ #include "peer_comm.h" #include "logging.h" +/* Artifically makes a network partition. */ +int do_partition = 0; +int partition_set = 0xfffffff; + extern limiter_t limiter; static const uint32_t MAGIC_MSG = 0x123123; @@ -386,11 +390,22 @@ static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) } #endif +#define ALLOW_PARTITION + int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { int result = 0; remote_limiter_t *remote; message_t msg; struct sockaddr_in toaddr; + int i; + +#ifdef ALLOW_PARTITION + + int partition_count = 0; + struct in_addr dest; + char dest_ip[22]; + +#endif memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; @@ -404,8 +419,28 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { message_to_nbo(&msg); /* Iterate though and send update to all remote limiters in our identity. */ - map_reset_iterate(comm->remote_node_map); - while ((remote = map_next(comm->remote_node_map))) { + for (i = 0; i < comm->remote_node_count; ++i) { + remote = &comm->remote_limiters[i]; + +#ifdef ALLOW_PARTITION + + if (do_partition) { + printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n", + partition_set, partition_count, partition_set & (1 << partition_count)); + /* If the partition count bit isn't high in the set, don't actually send anything. */ + if ((partition_set & (1 << partition_count)) == 0) { + dest.s_addr = ntohl(remote->addr); + strcpy(dest_ip, inet_ntoa(dest)); + + printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip); + + partition_count += 1; + continue; + } + } + +#endif + toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { @@ -414,6 +449,7 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); break; } + partition_count += 1; } return result; @@ -435,6 +471,8 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { for (i = 0; i < comm->gossip.gossip_branch; ++i) { message_t msg; + printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch); + if (comm->retrys[i] >= 0) { remote = &comm->remote_limiters[comm->retrys[i]]; targetid = comm->retrys[i]; diff --git a/drl/raterouter.h b/drl/raterouter.h index 789cd95..2594f12 100644 --- a/drl/raterouter.h +++ b/drl/raterouter.h @@ -34,7 +34,8 @@ enum policies { POLICY_GRD = 1, POLICY_FPS = 2 }; enum commfabrics { COMM_MESH = 1, COMM_GOSSIP = 2 }; -enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3 }; +enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4}; +enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4}; /* The comm library also has definitions for comfabrics. This prevents us * from defining them twice. */ @@ -74,6 +75,12 @@ enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3 }; */ #define FLOWKEYSIZE (13) +/* Causes each identity to track every flow in two tables. One table is as + * specified in the config file. The second is a standard table with + * "perfect" accounting so that we can compare the two. Turn this off for + * any type of production setting. */ +#define SHADOW_ACCTING + /* forward declare some structs */ struct limiter; struct identity; diff --git a/drl/ratetypes.h b/drl/ratetypes.h index 59a481a..319d071 100644 --- a/drl/ratetypes.h +++ b/drl/ratetypes.h @@ -17,13 +17,14 @@ #include "rate_accounting/simple.h" #endif -#include "calendar.h" +#include "bsd_queue.h" #include "config.h" #include "drl_state.h" #include "common_accounting.h" #include "standard.h" #include "samplehold.h" #include "simple.h" +#include "multipleinterval.h" /** Represents a DRL entitiy/group. */ @@ -85,6 +86,9 @@ typedef struct identity { * flows to grow before incurring losses. */ int flowstart; + /** Keeps track of the state of FPS dampening for this identity. */ + enum dampenings dampen_state; + /* GRD */ /** GRD drop probability information. */ @@ -126,17 +130,47 @@ typedef struct identity { /** Function to call when the table should be destroyed. */ void (*table_destroy_function)(void *); +#ifdef SHADOW_ACCTING + + common_accounting_t shadow_common; + + void *shadow_table; + + double localweight_copy; + double last_localweight_copy; + + enum dampenings dampen_state_copy; + +#endif + /* Scheduling bookkeeping. */ - /* Pointers to other identities in the scheduling calendar. */ - TAILQ_ENTRY(identity) calendar; + /** Scheduling unit that tells the limiter when to execute the main loop.*/ + struct ident_action *loop_action; - /* The number of limiter ticks at which this identity should be scheduled. - */ - uint32_t intervals; + /** Scheduling unit that tells the limiter when to communicate.*/ + struct ident_action *comm_action; + + /** The number of limiter ticks that should pass before this identity should + * be scheduled to execute its main estimate/allocate/enforce loop. */ + uint32_t mainloop_intervals; + + /** The number of limiter ticks that should pass before this identity should + * be scheduled for communication. */ + uint32_t communication_intervals; } identity_t; +enum identity_actions { ACTION_MAINLOOP = 1, ACTION_COMMUNICATE = 2 }; + +typedef struct ident_action { + struct identity *ident; + enum identity_actions action; + int valid; + + TAILQ_ENTRY(ident_action) calendar; +} identity_action; + /** * Represents the bottom most entity in the HTB hierarchy. For PlanetLab, * this corresponds to sliver (identified by Vserver context id, or xid). diff --git a/drl/samplehold.c b/drl/samplehold.c index 8ebab9d..8c566d4 100644 --- a/drl/samplehold.c +++ b/drl/samplehold.c @@ -1,10 +1,13 @@ /* See the DRL-LICENSE file for this file's software license. */ +#include #include +#include #include #include #include #include +#include #include #include "common_accounting.h" @@ -74,14 +77,16 @@ sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow return NULL; } - table->capacity = (uint32_t) ((base_size * oversampling_factor) * 1.03); + table->capacity = (uint32_t) (base_size * oversampling_factor); table->size = 0; table->hash_function = hash_function; table->sample_prob = (double) (((double) table->capacity / (double) max_bytes) * (double) RANDOM_GRANULARITY); table->threshold = (double) ((double) flow_percentage / 100) * max_bytes; + + /* Allocate the backing and give it a little bit extra to deal with variance. */ table->largest = NULL; - table->backing = malloc(sizeof(sampled_flow) * table->capacity); + table->backing = malloc(sizeof(sampled_flow) * table->capacity * 1.05); if (table->backing == NULL) { free(table); @@ -271,6 +276,8 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do uint32_t rate_delta = 0; double time_delta = 0; double unweighted_rate = 0; + struct in_addr src, dst; + char sip[22], dip[22]; /* Update common aggregate information. */ time_delta = timeval_subtract(now, table->common->last_update); @@ -295,8 +302,11 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do unweighted_rate * (1 - ewma_weight); } + printlog(LOG_DEBUG, "table->common->rate is now %u\n", table->common->rate); + table->common->bytes_since = 0; table->common->last_update = now; + table->common->num_flows = 0; /* Update per-flow information. */ table->largest = &table->backing[i]; @@ -329,6 +339,18 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do largest_rate = table->backing[i].rate; table->largest = &table->backing[i]; } + + table->common->num_flows += 1; + + /* Print debugging info. */ + src.s_addr = ntohl(table->backing[i].source_ip); + dst.s_addr = ntohl(table->backing[i].dest_ip); + strcpy(sip, inet_ntoa(src)); + strcpy(dip, inet_ntoa(dst)); + printlog(LOG_DEBUG, "FLOW: (%p) %s:%d -> %s:%d at %d\n", &table->backing[i], + sip, table->backing[i].source_port, + dip, table->backing[i].dest_port, + table->backing[i].rate); } } diff --git a/drl/samplehold.h b/drl/samplehold.h index 5a554ef..f96fa26 100644 --- a/drl/samplehold.h +++ b/drl/samplehold.h @@ -28,7 +28,10 @@ #define FLOW_DELETED 1 #define FLOW_USED 2 -#define RANDOM_GRANULARITY 1000 +#define RANDOM_GRANULARITY (1000) + +#define SAMPLEHOLD_PERCENTAGE (5) +#define SAMPLEHOLD_OVERFACTOR (10) /** In-table representation of a flow that has been sampled. */ typedef struct sampled_flow { diff --git a/drl/standard.c b/drl/standard.c index f8f813c..9c996e3 100644 --- a/drl/standard.c +++ b/drl/standard.c @@ -221,7 +221,7 @@ int standard_table_cleanup(standard_flow_table table) { time_t now = time(NULL); while (current != NULL) { - if (current->last_packet + FLOW_IDLE_TIME <= now) { + if (current->last_packet + STD_FLOW_IDLE_TIME <= now) { /* Flow hasn't received a packet in the time limit - kill it. */ remove = current; current = current->next; diff --git a/drl/standard.h b/drl/standard.h index d54aa3e..27a3d8c 100644 --- a/drl/standard.h +++ b/drl/standard.h @@ -11,12 +11,12 @@ #include /** The number of hash buckets in the table. */ -#define FLOW_HASH_SIZE 1024 +#define STD_FLOW_HASH_SIZE 1024 /** The number of seconds after which a flow is considered to be inactive. * Inactive flows will be removed from the table during the next call to the * cleanup function. */ -#define FLOW_IDLE_TIME 15 +#define STD_FLOW_IDLE_TIME 15 /** Representation of a flow in a standard table. */ typedef struct std_flow { @@ -86,7 +86,7 @@ struct std_flow_table { /* Table structures. */ /** Hash buckets - each is a list of flows. */ - struct std_flow *flows[FLOW_HASH_SIZE]; + struct std_flow *flows[STD_FLOW_HASH_SIZE]; /** The head of the linked list of flows. */ struct std_flow *flows_head; @@ -137,7 +137,7 @@ int standard_table_sample(standard_flow_table table, const key_flow *key); /** * Cleans the table by removing flow entires for any flow that hasn't seen a - * new packet in the interval specified by FLOW_IDLE_TIME seconds. + * new packet in the interval specified by STD_FLOW_IDLE_TIME seconds. */ int standard_table_cleanup(standard_flow_table table); diff --git a/drl/ulogd_DRL.c b/drl/ulogd_DRL.c index 14de4fe..f785d30 100644 --- a/drl/ulogd_DRL.c +++ b/drl/ulogd_DRL.c @@ -115,8 +115,8 @@ /* DRL specifics */ #include "raterouter.h" #include "util.h" -#include "calendar.h" #include "ratetypes.h" /* needs util and pthread.h */ +#include "calendar.h" #include "logging.h" /* @@ -124,8 +124,40 @@ * Add the config options for DRL. */ -static config_entry_t drl_configfile = { +static config_entry_t partition = { .next = NULL, + .key = "partition_set", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0xfffffff }, +}; + +static config_entry_t netem_slice = { + .next = &partition, + .key = "netem_slice", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + .u = { .string = "ALL" }, +}; + +static config_entry_t netem_loss = { + .next = &netem_slice, + .key = "netem_loss", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0 }, +}; + +static config_entry_t netem_delay = { + .next = &netem_loss, + .key = "netem_delay", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0 }, +}; + +static config_entry_t drl_configfile = { + .next = &netem_delay, .key = "drl_configfile", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, @@ -231,25 +263,34 @@ extern FILE *logfile; extern uint8_t system_loglevel; extern uint8_t do_enforcement; +/* From peer_comm.c - used to simulate partition. */ +extern int do_partition; +extern int partition_set; + /* functions */ static inline uint32_t -hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port) +hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max) { unsigned char mybytes[FLOWKEYSIZE]; mybytes[0] = protocol; *(uint32_t*)(&(mybytes[1])) = src_ip; *(uint32_t*)(&(mybytes[5])) = dst_ip; *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port; - return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1); + return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1); } uint32_t sampled_hasher(const key_flow *key) { - return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); + /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */ + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX); } uint32_t standard_hasher(const key_flow *key) { - return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE); +} + +uint32_t multiple_hasher(const key_flow *key) { + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE); } struct intr_id { @@ -418,6 +459,13 @@ static int _output_drl(ulog_iret_t *res) /* Update the identity's table. */ ident->table_sample_function(ident->table, &key); +#ifdef SHADOW_ACCTING + + /* Update the shadow perfect copy of the accounting table. */ + standard_table_sample((standard_flow_table) ident->shadow_table, &key); + +#endif + pthread_mutex_unlock(&ident->table_mutex); ident = ident->parent; @@ -455,6 +503,14 @@ static void free_identity(identity_t *ident) { ident->table_destroy_function(ident->table); } + if (ident->loop_action) { + ident->loop_action->valid = 0; + } + + if (ident->comm_action) { + ident->comm_action->valid = 0; + } + pthread_mutex_destroy(&ident->table_mutex); free(ident); @@ -483,6 +539,8 @@ static void free_instance(drl_instance_t *instance) { free(instance->machines); if (instance->sets) free(instance->sets); + + /* FIXME: Drain the calendar first and free all the entries. */ if (instance->cal) { free(instance->cal); } @@ -523,9 +581,10 @@ static identity_t *new_identity(ident_config *config) { ident->id = config->id; ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0); ident->fixed_ewma_weight = config->fixed_ewma_weight; - ident->intervals = config->intervals; + ident->communication_intervals = config->communication_intervals; + ident->mainloop_intervals = config->mainloop_intervals; ident->ewma_weight = pow(ident->fixed_ewma_weight, - (limiter.estintms/1000.0) * config->intervals); + (limiter.estintms/1000.0) * config->mainloop_intervals); ident->parent = NULL; pthread_mutex_init(&ident->table_mutex, NULL); @@ -546,10 +605,24 @@ static identity_t *new_identity(ident_config *config) { (void (*)(void *)) standard_table_destroy; break; + case ACT_MULTIPLE: + ident->table = + multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common); + + ident->table_sample_function = + (int (*)(void *, const key_flow *)) multiple_table_sample; + ident->table_cleanup_function = + (int (*)(void *)) multiple_table_cleanup; + ident->table_update_function = + (void (*)(void *, struct timeval, double)) multiple_table_update_flows; + ident->table_destroy_function = + (void (*)(void *)) multiple_table_destroy; + break; + case ACT_SAMPLEHOLD: ident->table = sampled_table_create(sampled_hasher, ident->limit * IDENT_CLEAN_INTERVAL, - 1, 20, &ident->common); + SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) sampled_table_sample; @@ -575,6 +648,18 @@ static identity_t *new_identity(ident_config *config) { break; } +#ifdef SHADOW_ACCTING + + ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common); + + if (ident->shadow_table == NULL) { + ident->table_destroy_function(ident->table); + free(ident); + return NULL; + } + +#endif + /* Make sure the table was allocated. */ if (ident->table == NULL) { free(ident); @@ -621,10 +706,11 @@ static int validate_config(ident_config *config) { } /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD, - * ACT_SIMPLE). */ + * ACT_SIMPLE, ACT_MULTIPLE). */ if (config->accounting != ACT_STANDARD && config->accounting != ACT_SAMPLEHOLD && - config->accounting != ACT_SIMPLE) { + config->accounting != ACT_SIMPLE && + config->accounting != ACT_MULTIPLE) { return 1; } @@ -823,12 +909,21 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { /* Allocate and add the machine identities. */ for (i = 0; i < configs.machine_count; ++i) { + identity_action *loop_action; + identity_action *comm_action; instance->machines[i] = new_identity(config); if (instance->machines[i] == NULL) { return ENOMEM; } + loop_action = malloc(sizeof(identity_action)); + comm_action = malloc(sizeof(identity_action)); + + if (loop_action == NULL || comm_action == NULL) { + return ENOMEM; + } + /* The first has no parent - it is the root. All others have the * previous ident as their parent. */ if (i == 0) { @@ -845,8 +940,23 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { config = config->next; + memset(loop_action, 0, sizeof(identity_action)); + memset(comm_action, 0, sizeof(identity_action)); + loop_action->ident = instance->machines[i]; + loop_action->action = ACTION_MAINLOOP; + loop_action->valid = 1; + comm_action->ident = instance->machines[i]; + comm_action->action = ACTION_COMMUNICATE; + comm_action->valid = 1; + + instance->machines[i]->loop_action = loop_action; + instance->machines[i]->comm_action = comm_action; + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), - instance->machines[i], calendar); + loop_action, calendar); + + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), + comm_action, calendar); /* Setup the array of pointers to leaves. This is easy for machines * because a machine node applies to every leaf. */ @@ -874,12 +984,37 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { /* Sets... */ for (i = 0; i < instance->set_count; ++i) { + identity_action *loop_action; + identity_action *comm_action; + if (instance->sets[i]->parent == NULL) { instance->sets[i]->parent = instance->last_machine; } + loop_action = malloc(sizeof(identity_action)); + comm_action = malloc(sizeof(identity_action)); + + if (loop_action == NULL || comm_action == NULL) { + return ENOMEM; + } + + memset(loop_action, 0, sizeof(identity_action)); + memset(comm_action, 0, sizeof(identity_action)); + loop_action->ident = instance->sets[i]; + loop_action->action = ACTION_MAINLOOP; + loop_action->valid = 1; + comm_action->ident = instance->sets[i]; + comm_action->action = ACTION_COMMUNICATE; + comm_action->valid = 1; + + instance->sets[i]->loop_action = loop_action; + instance->sets[i]->comm_action = comm_action; + + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), + loop_action, calendar); + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), - instance->sets[i], calendar); + comm_action, calendar); /* Setup the array of pointers to leaves. This is harder for sets, * but this doesn't need to be super-efficient because it happens @@ -915,7 +1050,7 @@ static void print_instance(drl_instance_t *instance) { static int assign_htb_hierarchy(drl_instance_t *instance) { int i, j; - int next_node = 0x11; + int next_node = 0x100; /* Chain machine nodes under 1:10. */ for (i = 0; i < instance->machine_count; ++i) { @@ -952,13 +1087,43 @@ static int assign_htb_hierarchy(drl_instance_t *instance) { /* Added this so that I could comment one line and kill off all of the * command execution. */ -static int execute_cmd(const char *cmd) { +static inline int execute_cmd(const char *cmd) { return system(cmd); } +static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor, + const uint32_t classid_major, const uint32_t classid_minor, + const uint64_t rate, const uint64_t ceil) { + char cmd[300]; + + sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit", + iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil); + printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd); + + return execute_cmd(cmd); +} + +static inline int add_htb_netem(const char *iface, const uint32_t parent_major, + const uint32_t parent_minor, const uint32_t handle, + const int loss, const int delay) { + char cmd[300]; + + sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, + parent_minor, handle); + printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + if (execute_cmd(cmd)) + printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n"); + + sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms", + iface, parent_major, parent_minor, handle, loss, delay); + printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + return execute_cmd(cmd); +} + static int create_htb_hierarchy(drl_instance_t *instance) { char cmd[300]; int i, j, k; + uint64_t gigabit = 1024 * 1024 * 1024; /* Nuke the hierarchy. */ sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb"); @@ -971,36 +1136,25 @@ static int create_htb_hierarchy(drl_instance_t *instance) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit"); - if (execute_cmd(cmd)) { + + if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/ + /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/ if (limiter.nodelimit) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit", - (unsigned long) limiter.nodelimit * 1024 * 1024); + if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit"); - } - - if (execute_cmd(cmd)) { - return 1; + if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit)) + return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add machines. */ for (i = 0; i < instance->machine_count; ++i) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", - instance->machines[i]->htb_parent, - instance->machines[i]->htb_node, - (unsigned long) instance->machines[i]->limit * 1024 * 1024); - - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1, + instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) { return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } #define LIMITEXEMPT @@ -1024,82 +1178,81 @@ static int create_htb_hierarchy(drl_instance_t *instance) { /* Add sets. */ for (j = (instance->set_count - 1); j >= 0; --j) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", - instance->sets[j]->htb_parent, - instance->sets[j]->htb_node, - (unsigned long) instance->sets[j]->limit * 1024 * 1024); - - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1, + instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) { return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add leaves. FIXME: Set static sliver limit as ceil here! */ for (k = 0; k < instance->leaf_count; ++k) { if (instance->leaves[k].parent == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit", - instance->leaves[k].xid, - (unsigned long) 100 * 1024 * 1024); + if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit", - instance->leaves[k].parent->htb_node, - instance->leaves[k].xid, - (unsigned long) 100 * 1024 * 1024); - } - - if (execute_cmd(cmd)) { - return 1; + if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) + return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit", - instance->leaves[k].xid); - if (execute_cmd(cmd)) { + /* Add exempt node for the leaf under 1:20 as 1:2 */ + if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add 1:1000 and 1:2000 */ if (instance->last_machine == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit", - instance->last_machine->htb_node); - } - - if (execute_cmd(cmd)) { - return 1; + if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit)) + return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit"); - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add 1:1fff and 1:2fff */ if (instance->last_machine == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit", - instance->last_machine->htb_node); + if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit)) + return 1; } - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit"); - if (execute_cmd(cmd)) { - return 1; + /* Artifical delay or loss for experimentation. */ + if (netem_delay.u.value || netem_loss.u.value) { + if (!strcmp(netem_slice.u.string, "ALL")) { + /* By default, netem applies to all leaves. */ + if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value)) + return 1; + if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value)) + return 1; + + for (k = 0; k < instance->leaf_count; ++k) { + if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid), + (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) { + return 1; + } + + //FIXME: add exempt delay/loss here on 0x2000 ... ? + } + } else { + /* netem_slice is not the default ALL value. Only apply netem + * to the slice that is set in netem_slice.u.string. */ + uint32_t slice_xid; + + sscanf(netem_slice.u.string, "%x", &slice_xid); + + if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value)) + return 1; + } } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); +#if 0 #ifdef DELAY40MS /* Only for artificial delay testing. */ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); @@ -1118,6 +1271,7 @@ static int create_htb_hierarchy(drl_instance_t *instance) { sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms"); execute_cmd(cmd); /* End delay testing */ +#endif #endif return 0; @@ -1333,6 +1487,8 @@ static int init_drl(void) { return false; } + partition_set = partition.u.value; + pthread_rwlock_unlock(&limiter.limiter_lock); if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) { @@ -1512,12 +1668,14 @@ static void *signal_thread_func(void *args) { sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); + sigaddset(&sigs, SIGUSR2); pthread_sigmask(SIG_BLOCK, &sigs, NULL); while (1) { sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); + sigaddset(&sigs, SIGUSR2); err = sigwait(&sigs, &sig); @@ -1545,6 +1703,9 @@ static void *signal_thread_func(void *args) { } pthread_rwlock_unlock(&limiter.limiter_lock); break; + case SIGUSR2: + do_partition = !do_partition; + break; default: /* Intentionally blank. */ break; @@ -1562,6 +1723,7 @@ static void _drl_reg_op(void) sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); + sigaddset(&signal_mask, SIGUSR2); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {