Added an accounting type called "multipleinterval" that makes decisions based on a window of history.
Separated communication from the "system tick". Now identities can specify a separate inverval for accounting a
nd communication.
Added the ability to do "shadow accounting" when using sample and hold to see what would have happened using reg
ular accounting.
Cleaned up the FPS allocate code a lot (finally)!
Added partition simulation via SIGUSR2 and a partition_set variable in ulogd.conf (mesh only, gossip to come).
Added the option of emulating loss/delay (via netem) on a slice for testing.
Some fixes to sample&hold and a few other small things.
#
SHARED_LIBS=ulogd_DRL.so
-OBJECTS=config.o drl_state.o estimate.o logging.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o
+OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o
all: $(SHARED_LIBS)
#define SCHEDLEN (1 << SCHEDBITS)
#define SCHEDMASK (SCHEDLEN - 1)
-/** Defines a struct ident_calendar whose elements are of type struct identity
- * (identity_t) */
-TAILQ_HEAD(ident_calendar, identity);
-
+/** Defines a struct ident_calendar whose elements are of type struct
+ * ident_action. */
+TAILQ_HEAD(ident_calendar, ident_action);
#endif /* _CALENDAR_H_ */
xmlChar *branch;
xmlChar *accounting;
xmlChar *ewma;
- xmlChar *intervals;
+ xmlChar *mainloop_intervals;
+ xmlChar *communication_intervals;
xmlNodePtr fields = ident->children;
ident_peer *current = NULL;
common->accounting = ACT_SAMPLEHOLD;
} else if (!xmlStrcmp(accounting, (const xmlChar *) "SIMPLE")) {
common->accounting = ACT_SIMPLE;
+ } else if (!xmlStrcmp(accounting, (const xmlChar *) "MULTIPLEINTERVAL")) {
+ common->accounting = ACT_MULTIPLE;
} else {
printlog(LOG_CRITICAL, "Unknown/invalid accounting table.\n");
xmlFree(accounting);
xmlFree(ewma);
}
- intervals = xmlGetProp(ident, (const xmlChar *) "intervals");
- if (intervals == NULL) {
- printlog(LOG_CRITICAL, "Ident missing interval count.\n");
- return EINVAL;
+ mainloop_intervals = xmlGetProp(ident, (const xmlChar *) "loop_intervals");
+ if (mainloop_intervals == NULL) {
+ printlog(LOG_WARN, "Ident id: %d missing loop_intervals, assuming 1.\n", common->id);
+ common->mainloop_intervals = 1;
+ } else {
+ common->mainloop_intervals = atoi((const char *) mainloop_intervals);
+ xmlFree(mainloop_intervals);
+ }
+
+ communication_intervals = xmlGetProp(ident, (const xmlChar *) "comm_intervals");
+ if (communication_intervals == NULL) {
+ printlog(LOG_WARN, "Ident id: %d missing comm_intervals, assuming 1.\n", common->id);
+ common->communication_intervals = 1;
} else {
- common->intervals = atoi((const char *) intervals);
- xmlFree(intervals);
+ common->communication_intervals = atoi((const char *) communication_intervals);
+ xmlFree(communication_intervals);
}
while (fields != NULL) {
/** The fixed (1-second) ewma weight value for this identity. */
double fixed_ewma_weight;
- /** The number of estimate intervals to wait between calls to estimate,
+ /** The number of limiter intervals to wait between calls to estimate,
* allocate and enforce. */
- int intervals;
+ int mainloop_intervals;
+
+ /** The number of limiter intervals to wait between communication. */
+ int communication_intervals;
/** The type of this identity. */
enum ident_types type;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGHUP);
sigaddset(&signal_mask, SIGUSR1);
+ sigaddset(&signal_mask, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
#include "raterouter.h"
#include "util.h"
#include "ratetypes.h" /* needs util and pthread.h */
+#include "calendar.h"
#include "logging.h"
-#define PRINT_COUNTER_RESET (0)
-
extern uint8_t system_loglevel;
-static int printcounter = PRINT_COUNTER_RESET - 1;
uint8_t do_enforcement = 0;
* to estimate the current aggregate rate and the rate of the individual flows
* in the table.
*/
-static void estimate(identity_t *ident) {
+static void estimate(identity_t *ident, const double estintms) {
struct timeval now;
+ double time_difference;
+ pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
+
gettimeofday(&now, NULL);
- pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
+ time_difference = timeval_subtract(now, ident->common.last_update);
+
+ if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) {
+ printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n",
+ estintms * ident->mainloop_intervals, time_difference * 1000);
+ }
ident->table_update_function(ident->table, now, ident->ewma_weight);
+#ifdef SHADOW_ACCTING
+
+ standard_table_update_flows((standard_flow_table) ident->shadow_table, now,
+ ident->ewma_weight);
+
+#endif
+
pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
}
* Determines the FPS weight allocation when the identity is under its current
* local rate limit.
*/
-static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) {
- uint32_t target = local_rate;
+static double allocate_fps_under_limit(identity_t *ident, uint32_t target, double peer_weights) {
double ideal_weight;
double total_weight = peer_weights + ident->last_localweight;
- if (ident->flowstart) {
- target = local_rate*4;
- if (local_rate >= FLOW_START_THRESHOLD) {
- ident->flowstart = false;
- }
- }
- else {
- /* June 16, 2008 (KCW)
- * ident->flowstart gets set initially to one, but it is never set again. However,
- * if a limiter gets flows and then the number of flows drops to zero, it has trouble
- * increasing the limit again. */
- if (local_rate < FLOW_START_THRESHOLD) {
- ident->flowstart = true;
- }
- }
-
if (target >= ident->limit) {
ideal_weight = total_weight;
} else if (target <= 0) {
*/
static double allocate_fps_over_limit(identity_t *ident) {
double ideal_weight;
+ double total_over_max;
if (ident->common.max_flow_rate > 0) {
ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
+ total_over_max = (double) ident->common.rate / (double) ident->common.max_flow_rate;
- printlog(LOG_DEBUG, "%.3f %d %d %d FlowCount, Limit, MaxRate, TotalRate\n",
- ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate);
+ printlog(LOG_DEBUG, "ideal_over: %.3f, limit: %d, max_flow_rate: %d, total_rate: %d, total/max: %.3f\n",
+ ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate, total_over_max);
} else {
ideal_weight = 1;
}
return ideal_weight;
}
+/**
+ * When FPS checks to see which mode it should be operating in
+ * (over limit vs under limit), we don't want it to actually look to
+ * see if we're at the limit. Instead, we want to see if we're getting
+ * close to the limit. This defines how close is "close enough".
+ *
+ * For example, if the limit is 50000 and we're sending 49000, we probably
+ * want to be in the over limit mode, even if we aren't actually over the limit
+ * in order to switch to the more aggressive weight calculations.
+ */
+static inline uint32_t close_enough(uint32_t limit) {
+ uint32_t difference = limit - (limit * CLOSE_ENOUGH);
+
+ if (difference < 2500) {
+ return (limit - 2500);
+ } else {
+ return (limit * CLOSE_ENOUGH);
+ }
+}
+
+static void print_statistics(identity_t *ident, const double ideal_weight,
+ const double total_weight, const double localweight,
+ const char *identifier, common_accounting_t *table,
+ const uint32_t resulting_limit) {
+ struct timeval tv;
+ double time_now;
+
+ gettimeofday(&tv, NULL);
+ time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
+
+ printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d %d %s:%d ",
+ time_now, table->inst_rate, ideal_weight, localweight, total_weight,
+ table->num_flows, table->num_flows_5k, table->num_flows_10k,
+ table->num_flows_20k, table->num_flows_50k, table->avg_rate,
+ table->max_flow_rate, table->max_flow_rate_flow_hash, resulting_limit,
+ identifier, ident->id);
+
+ if (table->max_flow_rate > 0) {
+ printlog(LOG_WARN, "%.3f\n", (double) table->rate / (double) table->max_flow_rate);
+ } else {
+ printlog(LOG_WARN, "0\n");
+ }
+
+ /* Print to the screen in debug mode. */
+ if (system_loglevel == LOG_DEBUG) {
+ printf("Local Rate: %d, Ideal Weight: %.3f, Local Weight: %.3f, Total Weight: %.3f\n",
+ table->rate, ideal_weight, ident->localweight, total_weight);
+ }
+}
+
+static uint32_t allocate_fps(identity_t *ident, double total_weight,
+ common_accounting_t *table, const char *identifier) {
+
+ uint32_t resulting_limit = 0;
+ double ideal_weight = 0.0;
+ double peer_weights = total_weight - ident->last_localweight;
+
+ /* Keep track of these for measurements & comparisons only. */
+ double ideal_under = 0.0;
+ double ideal_over = 0.0;
+
+ /* Weight sanity. */
+ if (peer_weights < 0.0) {
+ peer_weights = 0.0;
+ }
+
+ if (ident->dampen_state == DAMPEN_TEST) {
+ int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
+ double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+
+ if (rate_delta > threshold) {
+ ident->dampen_state = DAMPEN_PASSED;
+ } else {
+ ident->dampen_state = DAMPEN_FAILED;
+ }
+ }
+
+ /* Rate/weight sanity. */
+ if (table->rate <= 0) {
+ ideal_weight = 0.0;
+ }
+
+ /* Under the limit OR we failed our dampening test OR our current
+ * outgoing traffic rate is under the low "flowstart" watermark. */
+ else if (ident->dampen_state == DAMPEN_FAILED ||
+ table->rate < close_enough(ident->locallimit)) {
+#if 0
+ || ident->flowstart) {
+ uint32_t target_rate = table->rate;
+
+ if (ident->flowstart) {
+ target_rate *= 4;
+
+ if (table->rate >= FLOW_START_THRESHOLD) {
+ ident->flowstart = false;
+ }
+ } else {
+ /* June 16, 2008 (KCW)
+ * ident->flowstart gets set initially to one, but it is never set again. However,
+ * if a limiter gets flows and then the number of flows drops to zero, it has trouble
+ * increasing the limit again. */
+ if (table->rate < FLOW_START_THRESHOLD) {
+ ident->flowstart = true;
+ }
+ }
+ Old flowstart code.
+#endif
+
+ /* Boost low-limits so that they have room to grow. */
+ if (table->rate < FLOW_START_THRESHOLD) {
+ ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
+ } else {
+ ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
+ }
+
+ ideal_over = allocate_fps_over_limit(ident);
+
+ if (ideal_over < ideal_under) {
+ /* Degenerate case in which the agressive weight calculation was
+ * actually less than the under-the-limit case. Use it instead
+ * and skip the dampening check in the next interval. */
+ ideal_weight = ideal_over;
+ ident->dampen_state = DAMPEN_SKIP;
+ } else {
+ ident->dampen_state = DAMPEN_NONE;
+ }
+
+ /* Apply EWMA. */
+ ident->localweight = (ident->localweight * ident->ewma_weight +
+ ideal_weight * (1 - ident->ewma_weight));
+ }
+
+ /* At or over the limit. Use the aggressive weight calculation. */
+ else {
+ double portion_last_interval = 0.0;
+ double portion_this_interval = 0.0;
+
+ ideal_weight = ideal_over = allocate_fps_over_limit(ident);
+ ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
+
+ /* Apply EWMA. */
+ ident->localweight = (ident->localweight * ident->ewma_weight +
+ ideal_weight * (1 - ident->ewma_weight));
+
+ /* Now check whether the result of the aggressive weight calculation
+ * increases our portion of the weight "too much", in which case we
+ * dampen it. */
+
+ /* Our portion of weight in the whole system during the last interval.*/
+ portion_last_interval = ident->last_localweight / total_weight;
+
+ /* Our proposed portion of weight for the current interval. */
+ portion_this_interval = ident->localweight / (peer_weights + ident->localweight);
+
+ if (ident->dampen_state == DAMPEN_NONE &&
+ (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
+ ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
+ ident->dampen_state = DAMPEN_TEST;
+ } else {
+ ident->dampen_state = DAMPEN_SKIP;
+ }
+ }
+
+ /* Add the weight calculated in this interval to the total. */
+ ident->total_weight = total_weight = ident->localweight + peer_weights;
+
+ /* Convert weight value into a rate limit. If there is no measureable
+ * weight, do a L/n allocation. */
+ if (total_weight > 0) {
+ resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight);
+ } else {
+ resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
+ }
+
+ print_statistics(ident, ideal_weight, total_weight, ident->localweight,
+ identifier, table, resulting_limit);
+
+ return resulting_limit;
+}
+
+#ifdef SHADOW_ACCTING
+
+/* Runs through the allocate functionality without making any state changes to
+ * the identity. Useful for comparisons, especially for comparing standard
+ * and sample&hold accounting schemes. */
+static void allocate_fps_pretend(identity_t *ident, double total_weight,
+ common_accounting_t *table, const char *identifier) {
+
+ uint32_t resulting_limit = 0;
+ double ideal_weight = 0.0;
+ double peer_weights = total_weight - ident->last_localweight_copy;
+ double ideal_under = 0.0;
+ double ideal_over = 0.0;
+
+ if (peer_weights < 0.0) {
+ peer_weights = 0.0;
+ }
+
+ if (ident->dampen_state_copy == DAMPEN_TEST) {
+ int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
+ double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+
+ if (rate_delta > threshold) {
+ ident->dampen_state_copy = DAMPEN_PASSED;
+ } else {
+ ident->dampen_state_copy = DAMPEN_FAILED;
+ }
+ }
+
+ /* Rate/weight sanity. */
+ if (table->rate <= 0) {
+ ideal_weight = 0.0;
+ }
+
+ /* Under the limit OR we failed our dampening test OR our current
+ * outgoing traffic rate is under the low "flowstart" watermark. */
+ else if (ident->dampen_state_copy == DAMPEN_FAILED ||
+ table->rate < close_enough(ident->locallimit)) {
+
+ /* Boost low-limits so that they have room to grow. */
+ if (table->rate < FLOW_START_THRESHOLD) {
+ ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
+ } else {
+ ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
+ }
+
+ ideal_over = allocate_fps_over_limit(ident);
+
+ if (ideal_over < ideal_under) {
+ /* Degenerate case in which the agressive weight calculation was
+ * actually less than the under-the-limit case. Use it instead
+ * and skip the dampening check in the next interval. */
+ ideal_weight = ideal_over;
+ ident->dampen_state_copy = DAMPEN_SKIP;
+ } else {
+ ident->dampen_state_copy = DAMPEN_NONE;
+ }
+
+ /* Apply EWMA. */
+ ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
+ ideal_weight * (1 - ident->ewma_weight));
+ }
+
+ /* At or over the limit. Use the aggressive weight calculation. */
+ else {
+ double portion_last_interval = 0.0;
+ double portion_this_interval = 0.0;
+
+ ideal_weight = ideal_over = allocate_fps_over_limit(ident);
+ ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
+
+ /* Apply EWMA. */
+ ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
+ ideal_weight * (1 - ident->ewma_weight));
+
+ /* Now check whether the result of the aggressive weight calculation
+ * increases our portion of the weight "too much", in which case we
+ * dampen it. */
+
+ /* Our portion of weight in the whole system during the last interval.*/
+ portion_last_interval = ident->last_localweight / total_weight;
+
+ /* Our proposed portion of weight for the current interval. */
+ portion_this_interval = ident->localweight_copy / (peer_weights + ident->localweight_copy);
+
+ if (ident->dampen_state_copy == DAMPEN_NONE &&
+ (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
+ ident->localweight_copy = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
+ ident->dampen_state_copy = DAMPEN_TEST;
+ } else {
+ ident->dampen_state_copy = DAMPEN_SKIP;
+ }
+ }
+
+ /* Add the weight calculated in this interval to the total. */
+ total_weight = ident->localweight_copy + peer_weights;
+
+ /* Convert weight value into a rate limit. If there is no measureable
+ * weight, do a L/n allocation. */
+ if (total_weight > 0) {
+ resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight);
+ } else {
+ resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
+ }
+
+ print_statistics(ident, ideal_weight, total_weight, ident->localweight_copy,
+ identifier, table, resulting_limit);
+}
+
+#endif
+
/**
* Determines the amount of FPS weight to allocate to the identity during each
* estimate interval. Note that total_weight includes local weight.
*/
-static uint32_t allocate_fps(identity_t *ident, double total_weight) {
+static uint32_t allocate_fps_old(identity_t *ident, double total_weight) {
common_accounting_t *ftable = &ident->common; /* Common flow table info */
uint32_t local_rate = ftable->rate;
uint32_t ideallocal = 0;
if (local_rate <= 0) {
idealweight = 0;
} else if (dampen_increase == 0 &&
- (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) {
+ (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) {
/* We're under the limit - all flows are bottlenecked. */
idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
ideal_over = allocate_fps_over_limit(ident);
local_rate, idealweight, ident->localweight, total_weight);
}
+#if 0
if (printcounter <= 0) {
struct timeval tv;
double time_now;
printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
ideal_over, ideal_under);
}
+ See print_statistics()
+#endif
printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
ident->avg_bytes += ident->common.rate;
if (limiter->policy == POLICY_FPS) {
- ident->locallimit = allocate_fps(ident, comm_val);
+#ifdef SHADOW_ACCTING
+
+ allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID");
+
+ ident->last_localweight_copy = ident->localweight_copy;
+#endif
+
+ ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID");
ident->last_localweight = ident->localweight;
-
+
/* Update other limiters with our weight by writing to comm layer. */
write_local_value(&ident->comm, ident->localweight);
} else {
}
printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
+#if 0
if (printcounter == PRINT_COUNTER_RESET) {
- printlog(LOG_WARN, "%d\n", ident->locallimit);
+ if (ident->common.max_flow_rate > 0) {
+ printlog(LOG_WARN, "%d ID:%d %.3f\n", ident->locallimit, ident->id,
+ (double) ident->common.rate / (double) ident->common.max_flow_rate);
+ } else {
+ printlog(LOG_WARN, "%d ID:%d 0\n", ident->locallimit, ident->id);
+ }
}
+ This is now done in print_statistics()
+#endif
snprintf(cmd, CMD_BUFFER_SIZE,
"/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
if (ret) {
/* FIXME: call failed. What to do? */
- printlog(LOG_CRITICAL, "***TC call failed?***\n");
+ printlog(LOG_CRITICAL, "***TC call failed? Call was: %s***\n", cmd);
}
}
break;
ident->table_cleanup_function(ident->table);
+#ifdef SHADOW_ACCTING
+
+ standard_table_cleanup((standard_flow_table) ident->shadow_table);
+
+#endif
+
pthread_mutex_unlock(&ident->table_mutex);
}
* of identities.
*
* Each identity also has a private lock for its table. This gets locked by
- * table-modifying functions such as estimate and clean.
+ * table-modifying functions such as estimate and clean. It's also locked in
+ * ulogd_DRL.c when the table is being updated with new packets.
*/
void handle_estimation(void *arg) {
limiter_t *limiter = (limiter_t *) arg;
- identity_t *ident = NULL;
int clean_timer, clean_wait_intervals;
useconds_t sleep_time = limiter->estintms * 1000;
uint32_t cal_slot = 0;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGHUP);
sigaddset(&signal_mask, SIGUSR1);
+ sigaddset(&signal_mask, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
/* Determine the number of intervals we should wait before hitting the
clean_timer = clean_wait_intervals;
while (true) {
+ printlog(LOG_DEBUG, "--Beginning new tick.--\n");
+
/* Sleep according to the delay of the estimate interval. */
usleep(sleep_time);
/* Service all the identities that are scheduled to run during this
* tick. */
while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
- ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
- TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar);
+ identity_action *iaction = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
+ TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, iaction, calendar);
- /* Update the ident's flow accouting table with the latest info. */
- estimate(ident);
+ /* Only execute the action if it is valid. */
+ if (iaction->valid == 0) {
+ free(iaction);
+ continue;
+ }
+
+ switch (iaction->action) {
+ case ACTION_MAINLOOP:
+
+ printlog(LOG_DEBUG, "Main loop: identity %d\n", iaction->ident->id);
+
+ /* Update the ident's flow accouting table with the latest info. */
+ estimate(iaction->ident, limiter->estintms);
+
+ /* Determine its share of the rate allocation. */
+ allocate(limiter, iaction->ident);
- /* Determine its share of the rate allocation. */
- allocate(limiter, ident);
+ /* Make tc calls to enforce the rate we decided upon. */
+ enforce(limiter, iaction->ident);
- /* Make tc calls to enforce the rate we decided upon. */
- enforce(limiter, ident);
+ /* Add ident back to the queue at a future time slot. */
+ TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
+ ((cal_slot + iaction->ident->mainloop_intervals) & SCHEDMASK),
+ iaction, calendar);
+ break;
- /* Tell the comm library to propagate this identity's result for
- * this interval.*/
- send_update(&ident->comm, ident->id);
+ case ACTION_COMMUNICATE:
- /* Add ident back to the queue at a future time slot. */
- TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
- ((cal_slot + ident->intervals) & SCHEDMASK),
- ident, calendar);
+ printlog(LOG_DEBUG, "Communicating: identity %d\n", iaction->ident->id);
+
+ /* Tell the comm library to propagate this identity's result for
+ * this interval.*/
+ send_update(&iaction->ident->comm, iaction->ident->id);
+
+ /* Add ident back to the queue at a future time slot. */
+ TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
+ ((cal_slot + iaction->ident->communication_intervals) & SCHEDMASK),
+ iaction, calendar);
+ break;
+
+ default:
+ printlog(LOG_CRITICAL, "Unknown identity action!?!\n");
+ exit(EXIT_FAILURE);
+ }
}
print_interval--;
--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <inttypes.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <time.h>
+
+#include "common_accounting.h"
+#include "multipleinterval.h"
+#include "logging.h"
+
+multiple_flow_table multiple_table_create(uint32_t (*hash_function)(const key_flow *key), uint32_t interval_count, common_accounting_t *common) {
+ int i;
+ multiple_flow_table table = malloc(sizeof(struct mul_flow_table));
+
+ if (table == NULL) {
+ return NULL;
+ }
+
+ memset(table, 0, sizeof(struct mul_flow_table));
+ table->common = common;
+ table->hash_function = hash_function;
+ table->interval_count = interval_count;
+
+ gettimeofday(&table->common->last_update, NULL);
+
+ table->intervals = malloc(interval_count * sizeof(interval));
+
+ if (table->intervals == NULL) {
+ free(table);
+ return NULL;
+ }
+
+ memset(table->intervals, 0, interval_count * sizeof(interval));
+ table->intervals[0].valid = 1;
+ table->intervals[0].last_update = table->common->last_update;
+
+ for (i = 0; i < interval_count; ++i) {
+ table->intervals[i].next = &table->intervals[(i + 1) % interval_count];
+ }
+
+ table->current_interval = &table->intervals[0];
+
+ return table;
+}
+
+void multiple_table_destroy(multiple_flow_table table) {
+ multiple_flow *current, *next;
+
+ if ((current = table->flows_head)) {
+ while (current->next) {
+ next = current->next;
+ free(current->intervals);
+ free(current);
+ current = next;
+ }
+ free(current->intervals);
+ free(current);
+ }
+
+ free(table->intervals);
+ free(table);
+}
+
+/* Looks for the flow in the table. If the flow isn't there, it allocates a
+ * place for it. */
+multiple_flow *multiple_table_lookup(multiple_flow_table table, const key_flow *key) {
+ uint32_t hash;
+ multiple_flow *flow;
+ struct in_addr src, dst;
+ char sip[22], dip[22];
+ int i;
+
+ if (table == NULL) {
+ return NULL;
+ }
+
+ hash = table->hash_function(key);
+
+ /* Find the flow, if it's there. */
+ for (flow = table->flows[hash]; flow; flow = flow->nexth) {
+ if (flow->source_ip == key->source_ip &&
+ flow->dest_ip == key->dest_ip &&
+ flow->source_port == key->source_port &&
+ flow->dest_port == key->dest_port &&
+ flow->protocol == key->protocol) {
+ break;
+ }
+ }
+
+ if (flow == NULL) {
+ flow = malloc(sizeof(multiple_flow));
+ if (flow == NULL) {
+ printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n");
+ return NULL;
+ }
+ memset(flow, 0, sizeof(multiple_flow));
+
+ flow->intervals = malloc(table->interval_count * sizeof(interval));
+ if (flow->intervals == NULL) {
+ free(flow);
+ printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n");
+ return NULL;
+ }
+ memset(flow->intervals, 0, table->interval_count * sizeof(interval));
+
+ flow->protocol = key->protocol;
+ flow->source_ip = key->source_ip;
+ flow->dest_ip = key->dest_ip;
+ flow->source_port = key->source_port;
+ flow->dest_port = key->dest_port;
+
+ flow->intervals[0].last_packet = key->packet_time;
+ flow->intervals[0].last_update = table->common->last_update;
+ flow->intervals[0].valid = 1;
+
+ for (i = 0; i < table->interval_count; ++i) {
+ flow->intervals[i].next = &flow->intervals[(i + 1) % table->interval_count];
+ }
+
+ flow->current_interval = &flow->intervals[0];
+
+ /* Add the flow to the hash list. */
+ flow->nexth = table->flows[hash];
+ table->flows[hash] = flow;
+
+ /* Add the flow to the linked list. */
+ if (table->flows_tail) {
+ flow->prev = table->flows_tail;
+ table->flows_tail->next = flow;
+ table->flows_tail = flow;
+ } else {
+ table->flows_head = table->flows_tail = flow;
+ /* next and prev are already null due to memset above. */
+ }
+
+ src.s_addr = ntohl(flow->source_ip);
+ dst.s_addr = ntohl(flow->dest_ip);
+ strcpy(sip, inet_ntoa(src));
+ strcpy(dip, inet_ntoa(dst));
+ printlog(LOG_DEBUG, "ALLOC:%s:%hu -> %s:%hu\n", sip,
+ flow->source_port, dip, flow->dest_port);
+ }
+
+ return flow;
+}
+
+int multiple_table_sample(multiple_flow_table table, const key_flow *key) {
+ multiple_flow *flow;
+
+ assert(table != NULL);
+ assert(table->common != NULL);
+
+ /* Update aggregate. */
+ //table->common->bytes_since += key->packet_size;
+ table->current_interval->bytes_since += key->packet_size;
+ table->current_interval->valid = 1;
+
+ /* Update flow. */
+ flow = multiple_table_lookup(table, key);
+ if (flow == NULL) {
+ return 0;
+ }
+
+ /* Update flow's last packet info so that we know when to delete. */
+ flow->last_packet = key->packet_time;
+
+ /* Update interval information. */
+ flow->current_interval->bytes_since += key->packet_size;
+ flow->current_interval->last_packet = key->packet_time;
+ flow->current_interval->valid = 1;
+
+ return 1;
+}
+
+void multiple_table_remove(multiple_flow_table table, multiple_flow *flow) {
+ key_flow key;
+ uint32_t hash;
+
+ assert(flow);
+
+ /* Remove the flow from the hash list. */
+ key.source_ip = flow->source_ip;
+ key.dest_ip = flow->dest_ip;
+ key.source_port = flow->source_port;
+ key.dest_port = flow->dest_port;
+ key.protocol = flow->protocol;
+
+ hash = table->hash_function(&key);
+
+ assert(table->flows[hash]);
+
+ if (table->flows[hash] == flow) {
+ /* It's the head of the hash list. */
+ table->flows[hash] = flow->nexth;
+ } else {
+ multiple_flow *current, *prev;
+
+ prev = table->flows[hash];
+
+ for (current = table->flows[hash]->nexth; current; current = current->nexth) {
+ if (current == flow) {
+ prev->nexth = flow->nexth;
+ break;
+ } else {
+ prev = current;
+ }
+ }
+
+ if (current == NULL) {
+ printlog(LOG_CRITICAL, "Flow %p disappeared?\n", flow);
+ }
+ assert(current != NULL);
+ }
+
+ /* Remove the flow from the linked list. */
+ if (flow->prev == NULL && flow->next == NULL) {
+ /* It's the head, tail, and only element of the list. */
+ assert(table->flows_head == flow);
+ assert(table->flows_tail == flow);
+
+ table->flows_head = NULL;
+ table->flows_tail = NULL;
+ } else if (flow->prev == NULL) {
+ /* It's the head of the list. */
+ assert(table->flows_head == flow);
+
+ table->flows_head = flow->next;
+
+ if (table->flows_head != NULL) {
+ table->flows_head->prev = NULL;
+ }
+ } else if (flow->next == NULL) {
+ /* It's the tail of the list. */
+ assert(table->flows_tail == flow);
+
+ table->flows_tail = flow->prev;
+
+ table->flows_tail->next = NULL;
+ } else {
+ /* Not the head or tail of the list. */
+ assert(table->flows_head != flow);
+
+ flow->prev->next = flow->next;
+
+ if (flow->next != NULL) {
+ flow->next->prev = flow->prev;
+ }
+ }
+
+ /* Free the interval info. */
+ memset(flow->intervals, 0, table->interval_count * sizeof(interval));
+ free(flow->intervals);
+
+ /* Free the flow. */
+ memset(flow, 0, sizeof(multiple_flow));
+ free(flow);
+}
+
+int multiple_table_cleanup(multiple_flow_table table) {
+ multiple_flow *current = table->flows_head;
+ multiple_flow *remove;
+ time_t now = time(NULL);
+
+ while (current != NULL) {
+ if (current->last_packet + MUL_FLOW_IDLE_TIME <= now) {
+ /* Flow hasn't received a packet in the time limit - kill it. */
+ remove = current;
+ current = current->next;
+
+ multiple_table_remove(table, remove);
+ } else {
+ current = current->next;
+ }
+ }
+
+ return 0;
+}
+
+static interval *get_oldest_interval(interval *newest) {
+ interval *candidate = newest;
+ interval *oldest = NULL;
+
+ while (oldest == NULL) {
+ candidate = candidate->next;
+
+ if (candidate == newest) {
+ oldest = newest;
+ } else if (candidate->valid) {
+ oldest = candidate;
+ }
+ }
+
+ return oldest;
+}
+
+static uint32_t get_bytes_over_interval(interval *newest, interval *oldest) {
+ uint32_t result = newest->bytes_since;
+ interval *current = oldest;
+
+ while (current != newest) {
+ result += current->bytes_since;
+ current = current->next;
+ }
+
+ return result;
+}
+
+void multiple_table_update_flows(multiple_flow_table table, struct timeval now, double ewma_weight) {
+ uint32_t maxflowrate = 0;
+ double time_delta;
+ double unweighted_rate;
+ multiple_flow *current;
+ struct in_addr src, dst;
+ char sip[22], dip[22];
+ key_flow largest_flow_info;
+
+ /* Table interval variables. */
+ interval *table_newest = NULL;
+ interval *table_oldest = NULL;
+ uint32_t table_bytes_over_intervals = 0;
+
+ /* Reset statistics. */
+ table->common->num_flows = 0;
+ table->common->num_flows_5k = 0;
+ table->common->num_flows_10k = 0;
+ table->common->num_flows_20k = 0;
+ table->common->num_flows_50k = 0;
+ table->common->avg_rate = 0;
+ /* End statistics. */
+
+ table_newest = table->current_interval;
+ table_oldest = get_oldest_interval(table_newest);
+
+ table_bytes_over_intervals = get_bytes_over_interval(table_newest, table_oldest);
+
+ time_delta = timeval_subtract(now, table_oldest->last_update);
+
+ if (time_delta <= 0) {
+ unweighted_rate = 0;
+ } else {
+ unweighted_rate = table_bytes_over_intervals / time_delta;
+ }
+
+ table->common->last_inst_rate = table->common->inst_rate;
+ table->common->inst_rate = unweighted_rate;
+ printf("Unweighted rate is: %.3f, computed from %d bytes in %f seconds\n", unweighted_rate, table_bytes_over_intervals, time_delta);
+
+ table->common->last_rate = table->common->rate;
+
+ /* If the rate is zero, then we don't know anything yet. Don't apply EWMA
+ * in that case. */
+ if (table->common->rate == 0) {
+ table->common->rate = unweighted_rate;
+ } else {
+ //FIXME: Continue to use ewma here?
+ table->common->rate = table->common->rate * ewma_weight + unweighted_rate * (1 - ewma_weight);
+ }
+
+ table->common->last_update = now;
+ table->current_interval = table->current_interval->next;
+ table->current_interval->last_update = now;
+ table->current_interval->bytes_since = 0;
+ table->current_interval->valid = 1;
+
+ /* Update per-flow information. */
+ for (current = table->flows_head; current; current = current->next) {
+ interval *newest = current->current_interval;
+ interval *oldest = get_oldest_interval(newest);
+ uint32_t bytes_over_intervals = 0;
+
+ /* This flow is invalid - don't consider it further. */
+ if (newest->valid == 0) {
+ printlog(LOG_WARN, "Found invalid flow in table.\n");
+ continue;
+ }
+
+ time_delta = timeval_subtract(now, oldest->last_update);
+ bytes_over_intervals = get_bytes_over_interval(newest, oldest);
+
+ if (time_delta <= 0) {
+ unweighted_rate = 0;
+ } else {
+ unweighted_rate = bytes_over_intervals / time_delta;
+ }
+
+ current->last_rate = current->rate;
+
+ if (current->rate == 0) {
+ current->rate = unweighted_rate;
+ } else {
+ //FIXME: Continue to use ewma here?
+ current->rate = current->rate * ewma_weight + unweighted_rate * (1 - ewma_weight);
+ }
+
+ /* Update the accounting info for intervals. */
+ current->current_interval = current->current_interval->next;
+ current->current_interval->last_update = now;
+ current->current_interval->bytes_since = 0;
+ current->current_interval->valid = 1;
+
+ if (current->rate > maxflowrate) {
+ maxflowrate = current->rate;
+ largest_flow_info.source_ip = current->source_ip;
+ largest_flow_info.dest_ip = current->dest_ip;
+ largest_flow_info.source_port = current->source_port;
+ largest_flow_info.dest_port = current->dest_port;
+ largest_flow_info.protocol = current->protocol;
+ }
+
+ if (current->rate > 51200) {
+ table->common->num_flows_50k += 1;
+ table->common->num_flows_20k += 1;
+ table->common->num_flows_10k += 1;
+ table->common->num_flows_5k += 1;
+ table->common->num_flows += 1;
+ } else if (current->rate > 20480) {
+ table->common->num_flows_20k += 1;
+ table->common->num_flows_10k += 1;
+ table->common->num_flows_5k += 1;
+ table->common->num_flows += 1;
+ } else if (current->rate > 10240) {
+ table->common->num_flows_10k += 1;
+ table->common->num_flows_5k += 1;
+ table->common->num_flows += 1;
+ } else if (current->rate > 5120) {
+ table->common->num_flows_5k += 1;
+ table->common->num_flows += 1;
+ } else {
+ table->common->num_flows += 1;
+ }
+
+ src.s_addr = ntohl(current->source_ip);
+ dst.s_addr = ntohl(current->dest_ip);
+ strcpy(sip, inet_ntoa(src));
+ strcpy(dip, inet_ntoa(dst));
+ printlog(LOG_DEBUG, "FLOW: (%p) %s:%d -> %s:%d at %d\n", current,
+ sip, current->source_port,
+ dip, current->dest_port,
+ current->rate);
+ }
+
+ if (table->common->num_flows > 0) {
+ table->common->avg_rate = table->common->rate / table->common->num_flows;
+ }
+
+ printlog(LOG_DEBUG, "FLOW:--\n--\n");
+
+ table->common->max_flow_rate = maxflowrate;
+ table->common->max_flow_rate_flow_hash = table->hash_function(&largest_flow_info);
+}
--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+/*
+ * Defines the multiple-interval "perfect" flow accounting table.
+ *
+ */
+
+#ifndef _MULTIPLE_ACCOUNTING_H_
+#define _MULTIPLE_ACCOUNTING_H_
+
+#include <inttypes.h>
+
+//FIXME: Update comments on structure variables
+
+/** The number of hash buckets in the table. */
+#define MUL_FLOW_HASH_SIZE 1024
+
+/** The number of seconds after which a flow is considered to be inactive.
+ * Inactive flows will be removed from the table during the next call to the
+ * cleanup function. */
+#define MUL_FLOW_IDLE_TIME 15
+
+#define MUL_INTERVAL_COUNT 10
+
+typedef struct mul_interval {
+ /** The number of bytes this flow has sent since the last update to this
+ * interval structure. */
+ uint32_t bytes_since;
+
+ /** The time at which this interval was last updated. */
+ struct timeval last_update;
+
+ /** The time at which the most recent packet in this flow was received
+ * in this interval. */
+ time_t last_packet;
+
+ uint32_t valid;
+
+ struct mul_interval *next;
+} interval;
+
+/** Representation of a flow in a multiple-interval table. */
+typedef struct mul_flow {
+ /* Flow information. */
+
+ /** The rate of the flow in the current estimate interval. */
+ uint32_t rate;
+
+ /** The rate of the flow in the previous estimate interval. */
+ uint32_t last_rate;
+
+ time_t last_packet;
+
+ interval *current_interval;
+
+ interval *intervals;
+
+ /* Identification information. */
+
+ /** The flow's source IP address. */
+ uint32_t source_ip;
+
+ /** The flow's destination IP address. */
+ uint32_t dest_ip;
+
+ /** The flow's source port. */
+ uint16_t source_port;
+
+ /** The flow's destination port. */
+ uint16_t dest_port;
+
+ /** The flow's protocol. This corresponds to the protocol field of the IP
+ * header. */
+ uint8_t protocol;
+
+ /* Table state. */
+
+ /** Pointer to the next flow in the hash list. */
+ struct mul_flow *nexth;
+
+ /** Pointers to the next flow in the linked list. */
+ struct mul_flow *next;
+
+ /** Pointers to the previous flow in the linked list. */
+ struct mul_flow *prev;
+
+} multiple_flow;
+
+/**
+ * The "table" that stores the flows. It's constructed of two main pieces.
+ *
+ * The first is an array of hash buckets. Flows are classified into buckets
+ * by hashing the flow's values (key flow) using the table's hash function.
+ * Flows are chained together as a list in each bucket to deal with collisions.
+ *
+ * The second is a simple doubly linked list containing every flow in the table.
+ */
+struct mul_flow_table {
+
+ /** Pointer to the common flow information for the identity that owns this
+ * sampled flow table. This is updated with aggregate information. */
+ common_accounting_t *common;
+
+ uint32_t interval_count;
+
+ interval *current_interval;
+
+ interval *intervals;
+
+ /* Table structures. */
+
+ /** Hash buckets - each is a list of flows. */
+ struct mul_flow *flows[MUL_FLOW_HASH_SIZE];
+
+ /** The head of the linked list of flows. */
+ struct mul_flow *flows_head;
+
+ /** The tail of the linked list of flows. */
+ struct mul_flow *flows_tail;
+
+ /** Function pointer to the function that will yield the hash of a
+ * key_flow.
+ */
+ uint32_t (*hash_function)(const key_flow *key);
+
+};
+
+/** The type multiple_flow_table is really a pointer to a struct
+ * mul_flow_table. */
+typedef struct mul_flow_table *multiple_flow_table;
+
+/**
+ * Creates a new table that will use the specified hash function.
+ *
+ * Returns the new table or NULL on failure.
+ */
+multiple_flow_table multiple_table_create(uint32_t (*hash_function)(const key_flow *key), uint32_t interval_count, common_accounting_t *common);
+
+/**
+ * Destroys the specified table.
+ */
+void multiple_table_destroy(multiple_flow_table table);
+
+/**
+ * Looks for a flow that matches the given key in the specified table. If a
+ * matching flow is not found, this function will allocate a new flow for the
+ * key.
+ *
+ * Returns a pointer to the flow that matches the key or NULL if there is no
+ * matching flow and a new flow couldn't be allocated.
+ */
+multiple_flow *multiple_table_lookup(multiple_flow_table table, const key_flow *key);
+
+/**
+ * Updates the state of the table given a newly acquired packet.
+ *
+ * Returns 1 if the flow is in the table. 0 otherwise (indicating that memory
+ * could not be allocated to add a new flow to the table for the given key.
+ */
+int multiple_table_sample(multiple_flow_table table, const key_flow *key);
+
+/**
+ * Cleans the table by removing flow entires for any flow that hasn't seen a
+ * new packet in the interval specified by MUL_FLOW_IDLE_TIME seconds.
+ */
+int multiple_table_cleanup(multiple_flow_table table);
+
+/**
+ * Updates the rate information for all flows in the table according to the
+ * specified current time and EWMA weight.
+ */
+void multiple_table_update_flows(multiple_flow_table table, struct timeval now, double ewma_weight);
+
+#endif /* _MULTIPLE_ACCOUNTING_H_ */
#include "peer_comm.h"
#include "logging.h"
+/* Artifically makes a network partition. */
+int do_partition = 0;
+int partition_set = 0xfffffff;
+
extern limiter_t limiter;
static const uint32_t MAGIC_MSG = 0x123123;
}
#endif
+#define ALLOW_PARTITION
+
int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
int result = 0;
remote_limiter_t *remote;
message_t msg;
struct sockaddr_in toaddr;
+ int i;
+
+#ifdef ALLOW_PARTITION
+
+ int partition_count = 0;
+ struct in_addr dest;
+ char dest_ip[22];
+
+#endif
memset(&toaddr, 0, sizeof(struct sockaddr_in));
toaddr.sin_family = AF_INET;
message_to_nbo(&msg);
/* Iterate though and send update to all remote limiters in our identity. */
- map_reset_iterate(comm->remote_node_map);
- while ((remote = map_next(comm->remote_node_map))) {
+ for (i = 0; i < comm->remote_node_count; ++i) {
+ remote = &comm->remote_limiters[i];
+
+#ifdef ALLOW_PARTITION
+
+ if (do_partition) {
+ printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n",
+ partition_set, partition_count, partition_set & (1 << partition_count));
+ /* If the partition count bit isn't high in the set, don't actually send anything. */
+ if ((partition_set & (1 << partition_count)) == 0) {
+ dest.s_addr = ntohl(remote->addr);
+ strcpy(dest_ip, inet_ntoa(dest));
+
+ printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip);
+
+ partition_count += 1;
+ continue;
+ }
+ }
+
+#endif
+
toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
toaddr.sin_port = remote->port;
if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
break;
}
+ partition_count += 1;
}
return result;
for (i = 0; i < comm->gossip.gossip_branch; ++i) {
message_t msg;
+ printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
+
if (comm->retrys[i] >= 0) {
remote = &comm->remote_limiters[comm->retrys[i]];
targetid = comm->retrys[i];
enum policies { POLICY_GRD = 1, POLICY_FPS = 2 };
enum commfabrics { COMM_MESH = 1, COMM_GOSSIP = 2 };
-enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3 };
+enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4};
+enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4};
/* The comm library also has definitions for comfabrics. This prevents us
* from defining them twice. */
*/
#define FLOWKEYSIZE (13)
+/* Causes each identity to track every flow in two tables. One table is as
+ * specified in the config file. The second is a standard table with
+ * "perfect" accounting so that we can compare the two. Turn this off for
+ * any type of production setting. */
+#define SHADOW_ACCTING
+
/* forward declare some structs */
struct limiter;
struct identity;
#include "rate_accounting/simple.h"
#endif
-#include "calendar.h"
+#include "bsd_queue.h"
#include "config.h"
#include "drl_state.h"
#include "common_accounting.h"
#include "standard.h"
#include "samplehold.h"
#include "simple.h"
+#include "multipleinterval.h"
/** Represents a DRL entitiy/group. */
* flows to grow before incurring losses. */
int flowstart;
+ /** Keeps track of the state of FPS dampening for this identity. */
+ enum dampenings dampen_state;
+
/* GRD */
/** GRD drop probability information. */
/** Function to call when the table should be destroyed. */
void (*table_destroy_function)(void *);
+#ifdef SHADOW_ACCTING
+
+ common_accounting_t shadow_common;
+
+ void *shadow_table;
+
+ double localweight_copy;
+ double last_localweight_copy;
+
+ enum dampenings dampen_state_copy;
+
+#endif
+
/* Scheduling bookkeeping. */
- /* Pointers to other identities in the scheduling calendar. */
- TAILQ_ENTRY(identity) calendar;
+ /** Scheduling unit that tells the limiter when to execute the main loop.*/
+ struct ident_action *loop_action;
- /* The number of limiter ticks at which this identity should be scheduled.
- */
- uint32_t intervals;
+ /** Scheduling unit that tells the limiter when to communicate.*/
+ struct ident_action *comm_action;
+
+ /** The number of limiter ticks that should pass before this identity should
+ * be scheduled to execute its main estimate/allocate/enforce loop. */
+ uint32_t mainloop_intervals;
+
+ /** The number of limiter ticks that should pass before this identity should
+ * be scheduled for communication. */
+ uint32_t communication_intervals;
} identity_t;
+enum identity_actions { ACTION_MAINLOOP = 1, ACTION_COMMUNICATE = 2 };
+
+typedef struct ident_action {
+ struct identity *ident;
+ enum identity_actions action;
+ int valid;
+
+ TAILQ_ENTRY(ident_action) calendar;
+} identity_action;
+
/**
* Represents the bottom most entity in the HTB hierarchy. For PlanetLab,
* this corresponds to sliver (identified by Vserver context id, or xid).
/* See the DRL-LICENSE file for this file's software license. */
+#include <arpa/inet.h>
#include <inttypes.h>
+#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
+#include <sys/types.h>
#include <time.h>
#include "common_accounting.h"
return NULL;
}
- table->capacity = (uint32_t) ((base_size * oversampling_factor) * 1.03);
+ table->capacity = (uint32_t) (base_size * oversampling_factor);
table->size = 0;
table->hash_function = hash_function;
table->sample_prob = (double) (((double) table->capacity / (double) max_bytes) * (double) RANDOM_GRANULARITY);
table->threshold = (double) ((double) flow_percentage / 100) * max_bytes;
+
+ /* Allocate the backing and give it a little bit extra to deal with variance. */
table->largest = NULL;
- table->backing = malloc(sizeof(sampled_flow) * table->capacity);
+ table->backing = malloc(sizeof(sampled_flow) * table->capacity * 1.05);
if (table->backing == NULL) {
free(table);
uint32_t rate_delta = 0;
double time_delta = 0;
double unweighted_rate = 0;
+ struct in_addr src, dst;
+ char sip[22], dip[22];
/* Update common aggregate information. */
time_delta = timeval_subtract(now, table->common->last_update);
unweighted_rate * (1 - ewma_weight);
}
+ printlog(LOG_DEBUG, "table->common->rate is now %u\n", table->common->rate);
+
table->common->bytes_since = 0;
table->common->last_update = now;
+ table->common->num_flows = 0;
/* Update per-flow information. */
table->largest = &table->backing[i];
largest_rate = table->backing[i].rate;
table->largest = &table->backing[i];
}
+
+ table->common->num_flows += 1;
+
+ /* Print debugging info. */
+ src.s_addr = ntohl(table->backing[i].source_ip);
+ dst.s_addr = ntohl(table->backing[i].dest_ip);
+ strcpy(sip, inet_ntoa(src));
+ strcpy(dip, inet_ntoa(dst));
+ printlog(LOG_DEBUG, "FLOW: (%p) %s:%d -> %s:%d at %d\n", &table->backing[i],
+ sip, table->backing[i].source_port,
+ dip, table->backing[i].dest_port,
+ table->backing[i].rate);
}
}
#define FLOW_DELETED 1
#define FLOW_USED 2
-#define RANDOM_GRANULARITY 1000
+#define RANDOM_GRANULARITY (1000)
+
+#define SAMPLEHOLD_PERCENTAGE (5)
+#define SAMPLEHOLD_OVERFACTOR (10)
/** In-table representation of a flow that has been sampled. */
typedef struct sampled_flow {
time_t now = time(NULL);
while (current != NULL) {
- if (current->last_packet + FLOW_IDLE_TIME <= now) {
+ if (current->last_packet + STD_FLOW_IDLE_TIME <= now) {
/* Flow hasn't received a packet in the time limit - kill it. */
remove = current;
current = current->next;
#include <inttypes.h>
/** The number of hash buckets in the table. */
-#define FLOW_HASH_SIZE 1024
+#define STD_FLOW_HASH_SIZE 1024
/** The number of seconds after which a flow is considered to be inactive.
* Inactive flows will be removed from the table during the next call to the
* cleanup function. */
-#define FLOW_IDLE_TIME 15
+#define STD_FLOW_IDLE_TIME 15
/** Representation of a flow in a standard table. */
typedef struct std_flow {
/* Table structures. */
/** Hash buckets - each is a list of flows. */
- struct std_flow *flows[FLOW_HASH_SIZE];
+ struct std_flow *flows[STD_FLOW_HASH_SIZE];
/** The head of the linked list of flows. */
struct std_flow *flows_head;
/**
* Cleans the table by removing flow entires for any flow that hasn't seen a
- * new packet in the interval specified by FLOW_IDLE_TIME seconds.
+ * new packet in the interval specified by STD_FLOW_IDLE_TIME seconds.
*/
int standard_table_cleanup(standard_flow_table table);
/* DRL specifics */
#include "raterouter.h"
#include "util.h"
-#include "calendar.h"
#include "ratetypes.h" /* needs util and pthread.h */
+#include "calendar.h"
#include "logging.h"
/*
* Add the config options for DRL.
*/
-static config_entry_t drl_configfile = {
+static config_entry_t partition = {
.next = NULL,
+ .key = "partition_set",
+ .type = CONFIG_TYPE_INT,
+ .options = CONFIG_OPT_NONE,
+ .u = { .value = 0xfffffff },
+};
+
+static config_entry_t netem_slice = {
+ .next = &partition,
+ .key = "netem_slice",
+ .type = CONFIG_TYPE_STRING,
+ .options = CONFIG_OPT_NONE,
+ .u = { .string = "ALL" },
+};
+
+static config_entry_t netem_loss = {
+ .next = &netem_slice,
+ .key = "netem_loss",
+ .type = CONFIG_TYPE_INT,
+ .options = CONFIG_OPT_NONE,
+ .u = { .value = 0 },
+};
+
+static config_entry_t netem_delay = {
+ .next = &netem_loss,
+ .key = "netem_delay",
+ .type = CONFIG_TYPE_INT,
+ .options = CONFIG_OPT_NONE,
+ .u = { .value = 0 },
+};
+
+static config_entry_t drl_configfile = {
+ .next = &netem_delay,
.key = "drl_configfile",
.type = CONFIG_TYPE_STRING,
.options = CONFIG_OPT_MANDATORY,
extern uint8_t system_loglevel;
extern uint8_t do_enforcement;
+/* From peer_comm.c - used to simulate partition. */
+extern int do_partition;
+extern int partition_set;
+
/* functions */
static inline uint32_t
-hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port)
+hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max)
{
unsigned char mybytes[FLOWKEYSIZE];
mybytes[0] = protocol;
*(uint32_t*)(&(mybytes[1])) = src_ip;
*(uint32_t*)(&(mybytes[5])) = dst_ip;
*(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
- return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1);
+ return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1);
}
uint32_t sampled_hasher(const key_flow *key) {
- return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port);
+ /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */
+ return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX);
}
uint32_t standard_hasher(const key_flow *key) {
- return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port);
+ return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE);
+}
+
+uint32_t multiple_hasher(const key_flow *key) {
+ return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE);
}
struct intr_id {
/* Update the identity's table. */
ident->table_sample_function(ident->table, &key);
+#ifdef SHADOW_ACCTING
+
+ /* Update the shadow perfect copy of the accounting table. */
+ standard_table_sample((standard_flow_table) ident->shadow_table, &key);
+
+#endif
+
pthread_mutex_unlock(&ident->table_mutex);
ident = ident->parent;
ident->table_destroy_function(ident->table);
}
+ if (ident->loop_action) {
+ ident->loop_action->valid = 0;
+ }
+
+ if (ident->comm_action) {
+ ident->comm_action->valid = 0;
+ }
+
pthread_mutex_destroy(&ident->table_mutex);
free(ident);
free(instance->machines);
if (instance->sets)
free(instance->sets);
+
+ /* FIXME: Drain the calendar first and free all the entries. */
if (instance->cal) {
free(instance->cal);
}
ident->id = config->id;
ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
ident->fixed_ewma_weight = config->fixed_ewma_weight;
- ident->intervals = config->intervals;
+ ident->communication_intervals = config->communication_intervals;
+ ident->mainloop_intervals = config->mainloop_intervals;
ident->ewma_weight = pow(ident->fixed_ewma_weight,
- (limiter.estintms/1000.0) * config->intervals);
+ (limiter.estintms/1000.0) * config->mainloop_intervals);
ident->parent = NULL;
pthread_mutex_init(&ident->table_mutex, NULL);
(void (*)(void *)) standard_table_destroy;
break;
+ case ACT_MULTIPLE:
+ ident->table =
+ multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common);
+
+ ident->table_sample_function =
+ (int (*)(void *, const key_flow *)) multiple_table_sample;
+ ident->table_cleanup_function =
+ (int (*)(void *)) multiple_table_cleanup;
+ ident->table_update_function =
+ (void (*)(void *, struct timeval, double)) multiple_table_update_flows;
+ ident->table_destroy_function =
+ (void (*)(void *)) multiple_table_destroy;
+ break;
+
case ACT_SAMPLEHOLD:
ident->table = sampled_table_create(sampled_hasher,
ident->limit * IDENT_CLEAN_INTERVAL,
- 1, 20, &ident->common);
+ SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common);
ident->table_sample_function =
(int (*)(void *, const key_flow *)) sampled_table_sample;
break;
}
+#ifdef SHADOW_ACCTING
+
+ ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common);
+
+ if (ident->shadow_table == NULL) {
+ ident->table_destroy_function(ident->table);
+ free(ident);
+ return NULL;
+ }
+
+#endif
+
/* Make sure the table was allocated. */
if (ident->table == NULL) {
free(ident);
}
/* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
- * ACT_SIMPLE). */
+ * ACT_SIMPLE, ACT_MULTIPLE). */
if (config->accounting != ACT_STANDARD &&
config->accounting != ACT_SAMPLEHOLD &&
- config->accounting != ACT_SIMPLE) {
+ config->accounting != ACT_SIMPLE &&
+ config->accounting != ACT_MULTIPLE) {
return 1;
}
/* Allocate and add the machine identities. */
for (i = 0; i < configs.machine_count; ++i) {
+ identity_action *loop_action;
+ identity_action *comm_action;
instance->machines[i] = new_identity(config);
if (instance->machines[i] == NULL) {
return ENOMEM;
}
+ loop_action = malloc(sizeof(identity_action));
+ comm_action = malloc(sizeof(identity_action));
+
+ if (loop_action == NULL || comm_action == NULL) {
+ return ENOMEM;
+ }
+
/* The first has no parent - it is the root. All others have the
* previous ident as their parent. */
if (i == 0) {
config = config->next;
+ memset(loop_action, 0, sizeof(identity_action));
+ memset(comm_action, 0, sizeof(identity_action));
+ loop_action->ident = instance->machines[i];
+ loop_action->action = ACTION_MAINLOOP;
+ loop_action->valid = 1;
+ comm_action->ident = instance->machines[i];
+ comm_action->action = ACTION_COMMUNICATE;
+ comm_action->valid = 1;
+
+ instance->machines[i]->loop_action = loop_action;
+ instance->machines[i]->comm_action = comm_action;
+
TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
- instance->machines[i], calendar);
+ loop_action, calendar);
+
+ TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
+ comm_action, calendar);
/* Setup the array of pointers to leaves. This is easy for machines
* because a machine node applies to every leaf. */
/* Sets... */
for (i = 0; i < instance->set_count; ++i) {
+ identity_action *loop_action;
+ identity_action *comm_action;
+
if (instance->sets[i]->parent == NULL) {
instance->sets[i]->parent = instance->last_machine;
}
+ loop_action = malloc(sizeof(identity_action));
+ comm_action = malloc(sizeof(identity_action));
+
+ if (loop_action == NULL || comm_action == NULL) {
+ return ENOMEM;
+ }
+
+ memset(loop_action, 0, sizeof(identity_action));
+ memset(comm_action, 0, sizeof(identity_action));
+ loop_action->ident = instance->sets[i];
+ loop_action->action = ACTION_MAINLOOP;
+ loop_action->valid = 1;
+ comm_action->ident = instance->sets[i];
+ comm_action->action = ACTION_COMMUNICATE;
+ comm_action->valid = 1;
+
+ instance->sets[i]->loop_action = loop_action;
+ instance->sets[i]->comm_action = comm_action;
+
+ TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
+ loop_action, calendar);
+
TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
- instance->sets[i], calendar);
+ comm_action, calendar);
/* Setup the array of pointers to leaves. This is harder for sets,
* but this doesn't need to be super-efficient because it happens
static int assign_htb_hierarchy(drl_instance_t *instance) {
int i, j;
- int next_node = 0x11;
+ int next_node = 0x100;
/* Chain machine nodes under 1:10. */
for (i = 0; i < instance->machine_count; ++i) {
/* Added this so that I could comment one line and kill off all of the
* command execution. */
-static int execute_cmd(const char *cmd) {
+static inline int execute_cmd(const char *cmd) {
return system(cmd);
}
+static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor,
+ const uint32_t classid_major, const uint32_t classid_minor,
+ const uint64_t rate, const uint64_t ceil) {
+ char cmd[300];
+
+ sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit",
+ iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil);
+ printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd);
+
+ return execute_cmd(cmd);
+}
+
+static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
+ const uint32_t parent_minor, const uint32_t handle,
+ const int loss, const int delay) {
+ char cmd[300];
+
+ sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
+ parent_minor, handle);
+ printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
+ if (execute_cmd(cmd))
+ printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n");
+
+ sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms",
+ iface, parent_major, parent_minor, handle, loss, delay);
+ printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
+ return execute_cmd(cmd);
+}
+
static int create_htb_hierarchy(drl_instance_t *instance) {
char cmd[300];
int i, j, k;
+ uint64_t gigabit = 1024 * 1024 * 1024;
/* Nuke the hierarchy. */
sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
return 1;
}
printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
- sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit");
- if (execute_cmd(cmd)) {
+
+ if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit))
return 1;
- }
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
- /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/
+ /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/
if (limiter.nodelimit) {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit",
- (unsigned long) limiter.nodelimit * 1024 * 1024);
+ if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024))
+ return 1;
} else {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit");
- }
-
- if (execute_cmd(cmd)) {
- return 1;
+ if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit))
+ return 1;
}
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
/* Add machines. */
for (i = 0; i < instance->machine_count; ++i) {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
- instance->machines[i]->htb_parent,
- instance->machines[i]->htb_node,
- (unsigned long) instance->machines[i]->limit * 1024 * 1024);
-
- if (execute_cmd(cmd)) {
+ if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1,
+ instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) {
return 1;
}
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
}
#define LIMITEXEMPT
/* Add sets. */
for (j = (instance->set_count - 1); j >= 0; --j) {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
- instance->sets[j]->htb_parent,
- instance->sets[j]->htb_node,
- (unsigned long) instance->sets[j]->limit * 1024 * 1024);
-
- if (execute_cmd(cmd)) {
+ if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1,
+ instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) {
return 1;
}
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
}
/* Add leaves. FIXME: Set static sliver limit as ceil here! */
for (k = 0; k < instance->leaf_count; ++k) {
if (instance->leaves[k].parent == NULL) {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit",
- instance->leaves[k].xid,
- (unsigned long) 100 * 1024 * 1024);
+ if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
+ return 1;
} else {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit",
- instance->leaves[k].parent->htb_node,
- instance->leaves[k].xid,
- (unsigned long) 100 * 1024 * 1024);
- }
-
- if (execute_cmd(cmd)) {
- return 1;
+ if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
+ return 1;
}
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
-
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit",
- instance->leaves[k].xid);
- if (execute_cmd(cmd)) {
+ /* Add exempt node for the leaf under 1:20 as 1:2<xid> */
+ if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit))
return 1;
- }
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
}
/* Add 1:1000 and 1:2000 */
if (instance->last_machine == NULL) {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit");
+ if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit))
+ return 1;
} else {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit",
- instance->last_machine->htb_node);
- }
-
- if (execute_cmd(cmd)) {
- return 1;
+ if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit))
+ return 1;
}
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit");
- if (execute_cmd(cmd)) {
+ if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit))
return 1;
- }
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
/* Add 1:1fff and 1:2fff */
if (instance->last_machine == NULL) {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit");
+ if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit))
+ return 1;
} else {
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit",
- instance->last_machine->htb_node);
+ if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit))
+ return 1;
}
- if (execute_cmd(cmd)) {
+ if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit))
return 1;
- }
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
- sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit");
- if (execute_cmd(cmd)) {
- return 1;
+ /* Artifical delay or loss for experimentation. */
+ if (netem_delay.u.value || netem_loss.u.value) {
+ if (!strcmp(netem_slice.u.string, "ALL")) {
+ /* By default, netem applies to all leaves. */
+ if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value))
+ return 1;
+ if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value))
+ return 1;
+
+ for (k = 0; k < instance->leaf_count; ++k) {
+ if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid),
+ (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) {
+ return 1;
+ }
+
+ //FIXME: add exempt delay/loss here on 0x2000 ... ?
+ }
+ } else {
+ /* netem_slice is not the default ALL value. Only apply netem
+ * to the slice that is set in netem_slice.u.string. */
+ uint32_t slice_xid;
+
+ sscanf(netem_slice.u.string, "%x", &slice_xid);
+
+ if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value))
+ return 1;
+ }
}
- printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
+#if 0
#ifdef DELAY40MS
/* Only for artificial delay testing. */
sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms");
execute_cmd(cmd);
/* End delay testing */
+#endif
#endif
return 0;
return false;
}
+ partition_set = partition.u.value;
+
pthread_rwlock_unlock(&limiter.limiter_lock);
if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
sigemptyset(&sigs);
sigaddset(&sigs, SIGHUP);
sigaddset(&sigs, SIGUSR1);
+ sigaddset(&sigs, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &sigs, NULL);
while (1) {
sigemptyset(&sigs);
sigaddset(&sigs, SIGHUP);
sigaddset(&sigs, SIGUSR1);
+ sigaddset(&sigs, SIGUSR2);
err = sigwait(&sigs, &sig);
}
pthread_rwlock_unlock(&limiter.limiter_lock);
break;
+ case SIGUSR2:
+ do_partition = !do_partition;
+ break;
default:
/* Intentionally blank. */
break;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGHUP);
sigaddset(&signal_mask, SIGUSR1);
+ sigaddset(&signal_mask, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {