X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Fulogd_DRL.c;h=f785d301b88892a3337278f3a11d72631bf7afbb;hp=5d7ca7ee697f22f714711733c4ac1fb3393116bb;hb=f83340496f632165030cc92cd98408a87082f6b1;hpb=0be9704d6b24d09ebd55beedec52758cb88c570b diff --git a/drl/ulogd_DRL.c b/drl/ulogd_DRL.c index 5d7ca7e..f785d30 100644 --- a/drl/ulogd_DRL.c +++ b/drl/ulogd_DRL.c @@ -115,8 +115,8 @@ /* DRL specifics */ #include "raterouter.h" #include "util.h" -#include "calendar.h" #include "ratetypes.h" /* needs util and pthread.h */ +#include "calendar.h" #include "logging.h" /* @@ -124,8 +124,40 @@ * Add the config options for DRL. */ -static config_entry_t drl_configfile = { +static config_entry_t partition = { .next = NULL, + .key = "partition_set", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0xfffffff }, +}; + +static config_entry_t netem_slice = { + .next = &partition, + .key = "netem_slice", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + .u = { .string = "ALL" }, +}; + +static config_entry_t netem_loss = { + .next = &netem_slice, + .key = "netem_loss", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0 }, +}; + +static config_entry_t netem_delay = { + .next = &netem_loss, + .key = "netem_delay", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0 }, +}; + +static config_entry_t drl_configfile = { + .next = &netem_delay, .key = "drl_configfile", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, @@ -229,26 +261,36 @@ uint32_t local_ip = 0; limiter_t limiter; extern FILE *logfile; extern uint8_t system_loglevel; +extern uint8_t do_enforcement; + +/* From peer_comm.c - used to simulate partition. */ +extern int do_partition; +extern int partition_set; /* functions */ static inline uint32_t -hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port) +hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max) { unsigned char mybytes[FLOWKEYSIZE]; mybytes[0] = protocol; *(uint32_t*)(&(mybytes[1])) = src_ip; *(uint32_t*)(&(mybytes[5])) = dst_ip; *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port; - return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1); + return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1); } uint32_t sampled_hasher(const key_flow *key) { - return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); + /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */ + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX); } uint32_t standard_hasher(const key_flow *key) { - return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE); +} + +uint32_t multiple_hasher(const key_flow *key) { + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE); } struct intr_id { @@ -417,6 +459,13 @@ static int _output_drl(ulog_iret_t *res) /* Update the identity's table. */ ident->table_sample_function(ident->table, &key); +#ifdef SHADOW_ACCTING + + /* Update the shadow perfect copy of the accounting table. */ + standard_table_sample((standard_flow_table) ident->shadow_table, &key); + +#endif + pthread_mutex_unlock(&ident->table_mutex); ident = ident->parent; @@ -454,6 +503,14 @@ static void free_identity(identity_t *ident) { ident->table_destroy_function(ident->table); } + if (ident->loop_action) { + ident->loop_action->valid = 0; + } + + if (ident->comm_action) { + ident->comm_action->valid = 0; + } + pthread_mutex_destroy(&ident->table_mutex); free(ident); @@ -482,6 +539,8 @@ static void free_instance(drl_instance_t *instance) { free(instance->machines); if (instance->sets) free(instance->sets); + + /* FIXME: Drain the calendar first and free all the entries. */ if (instance->cal) { free(instance->cal); } @@ -520,11 +579,12 @@ static identity_t *new_identity(ident_config *config) { memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t)); ident->id = config->id; - ident->limit = (uint32_t) (((double) config->limit * 1000000.0) / 8.0); + ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0); ident->fixed_ewma_weight = config->fixed_ewma_weight; - ident->intervals = config->intervals; + ident->communication_intervals = config->communication_intervals; + ident->mainloop_intervals = config->mainloop_intervals; ident->ewma_weight = pow(ident->fixed_ewma_weight, - (limiter.estintms/1000.0) * config->intervals); + (limiter.estintms/1000.0) * config->mainloop_intervals); ident->parent = NULL; pthread_mutex_init(&ident->table_mutex, NULL); @@ -545,10 +605,24 @@ static identity_t *new_identity(ident_config *config) { (void (*)(void *)) standard_table_destroy; break; + case ACT_MULTIPLE: + ident->table = + multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common); + + ident->table_sample_function = + (int (*)(void *, const key_flow *)) multiple_table_sample; + ident->table_cleanup_function = + (int (*)(void *)) multiple_table_cleanup; + ident->table_update_function = + (void (*)(void *, struct timeval, double)) multiple_table_update_flows; + ident->table_destroy_function = + (void (*)(void *)) multiple_table_destroy; + break; + case ACT_SAMPLEHOLD: ident->table = sampled_table_create(sampled_hasher, ident->limit * IDENT_CLEAN_INTERVAL, - 1, 20, &ident->common); + SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) sampled_table_sample; @@ -574,6 +648,18 @@ static identity_t *new_identity(ident_config *config) { break; } +#ifdef SHADOW_ACCTING + + ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common); + + if (ident->shadow_table == NULL) { + ident->table_destroy_function(ident->table); + free(ident); + return NULL; + } + +#endif + /* Make sure the table was allocated. */ if (ident->table == NULL) { free(ident); @@ -620,10 +706,11 @@ static int validate_config(ident_config *config) { } /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD, - * ACT_SIMPLE). */ + * ACT_SIMPLE, ACT_MULTIPLE). */ if (config->accounting != ACT_STANDARD && config->accounting != ACT_SAMPLEHOLD && - config->accounting != ACT_SIMPLE) { + config->accounting != ACT_SIMPLE && + config->accounting != ACT_MULTIPLE) { return 1; } @@ -766,8 +853,37 @@ static int validate_configs(parsed_configs configs, drl_instance_t *instance) { return 0; } +static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) { + int count = 0; + identity_t *current_ident; + leaf_t *current_leaf; + leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *)); + if (leaves == NULL) { + return 1; + } + + map_reset_iterate(instance->leaf_map); + while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) { + current_ident = current_leaf->parent; + while (current_ident != NULL && current_ident != instance->last_machine) { + if (current_ident == ident) { + /* Found the ident we were looking for - add the leaf. */ + leaves[count] = current_leaf; + count += 1; + break; + } + current_ident = current_ident->parent; + } + } + + ident->leaves = leaves; + ident->leaf_count = count; + + return 0; +} + static int init_identities(parsed_configs configs, drl_instance_t *instance) { - int i; + int i, j; ident_config *config = configs.machines; leaf_t *leaf = NULL; @@ -793,12 +909,21 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { /* Allocate and add the machine identities. */ for (i = 0; i < configs.machine_count; ++i) { + identity_action *loop_action; + identity_action *comm_action; instance->machines[i] = new_identity(config); if (instance->machines[i] == NULL) { return ENOMEM; } + loop_action = malloc(sizeof(identity_action)); + comm_action = malloc(sizeof(identity_action)); + + if (loop_action == NULL || comm_action == NULL) { + return ENOMEM; + } + /* The first has no parent - it is the root. All others have the * previous ident as their parent. */ if (i == 0) { @@ -815,8 +940,35 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { config = config->next; + memset(loop_action, 0, sizeof(identity_action)); + memset(comm_action, 0, sizeof(identity_action)); + loop_action->ident = instance->machines[i]; + loop_action->action = ACTION_MAINLOOP; + loop_action->valid = 1; + comm_action->ident = instance->machines[i]; + comm_action->action = ACTION_COMMUNICATE; + comm_action->valid = 1; + + instance->machines[i]->loop_action = loop_action; + instance->machines[i]->comm_action = comm_action; + + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), + loop_action, calendar); + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), - instance->machines[i], calendar); + comm_action, calendar); + + /* Setup the array of pointers to leaves. This is easy for machines + * because a machine node applies to every leaf. */ + instance->machines[i]->leaves = + malloc(instance->leaf_count * sizeof(leaf_t *)); + if (instance->machines[i]->leaves == NULL) { + return ENOMEM; + } + instance->machines[i]->leaf_count = instance->leaf_count; + for (j = 0; j < instance->leaf_count; ++j) { + instance->machines[i]->leaves[j] = &instance->leaves[j]; + } } /* Connect the set subtree to the machines. Any set or leaf without a @@ -832,12 +984,44 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { /* Sets... */ for (i = 0; i < instance->set_count; ++i) { + identity_action *loop_action; + identity_action *comm_action; + if (instance->sets[i]->parent == NULL) { instance->sets[i]->parent = instance->last_machine; } + loop_action = malloc(sizeof(identity_action)); + comm_action = malloc(sizeof(identity_action)); + + if (loop_action == NULL || comm_action == NULL) { + return ENOMEM; + } + + memset(loop_action, 0, sizeof(identity_action)); + memset(comm_action, 0, sizeof(identity_action)); + loop_action->ident = instance->sets[i]; + loop_action->action = ACTION_MAINLOOP; + loop_action->valid = 1; + comm_action->ident = instance->sets[i]; + comm_action->action = ACTION_COMMUNICATE; + comm_action->valid = 1; + + instance->sets[i]->loop_action = loop_action; + instance->sets[i]->comm_action = comm_action; + + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), + loop_action, calendar); + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), - instance->sets[i], calendar); + comm_action, calendar); + + /* Setup the array of pointers to leaves. This is harder for sets, + * but this doesn't need to be super-efficient because it happens + * rarely and it isn't on the critical path for reconfig(). */ + if (fill_set_leaf_pointer(instance, instance->sets[i])) { + return ENOMEM; + } } /* Success. */ @@ -848,23 +1032,25 @@ static void print_instance(drl_instance_t *instance) { leaf_t *leaf = NULL; identity_t *ident = NULL; - map_reset_iterate(instance->leaf_map); - while ((leaf = (leaf_t *) map_next(instance->leaf_map))) { - printf("%x:", leaf->xid); - ident = leaf->parent; - while (ident) { - printf("%d:",ident->id); - ident = ident->parent; + if (system_loglevel == LOG_DEBUG) { + map_reset_iterate(instance->leaf_map); + while ((leaf = (leaf_t *) map_next(instance->leaf_map))) { + printf("%x:", leaf->xid); + ident = leaf->parent; + while (ident) { + printf("%d:",ident->id); + ident = ident->parent; + } + printf("Leaf's parent pointer is %p\n", leaf->parent); } - printf("Leaf's parent pointer is %p\n", leaf->parent); - } - printf("instance->last_machine is %p\n", instance->last_machine); + printf("instance->last_machine is %p\n", instance->last_machine); + } } static int assign_htb_hierarchy(drl_instance_t *instance) { int i, j; - int next_node = 0x11; + int next_node = 0x100; /* Chain machine nodes under 1:10. */ for (i = 0; i < instance->machine_count; ++i) { @@ -874,7 +1060,7 @@ static int assign_htb_hierarchy(drl_instance_t *instance) { } else { /* Pointerific! */ instance->machines[i]->htb_parent = - instance->machines[i]->parent->htb_node; + instance->machines[i]->parent->htb_node; } instance->machines[i]->htb_node = next_node; @@ -901,13 +1087,43 @@ static int assign_htb_hierarchy(drl_instance_t *instance) { /* Added this so that I could comment one line and kill off all of the * command execution. */ -static int execute_cmd(const char *cmd) { +static inline int execute_cmd(const char *cmd) { return system(cmd); } +static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor, + const uint32_t classid_major, const uint32_t classid_minor, + const uint64_t rate, const uint64_t ceil) { + char cmd[300]; + + sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit", + iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil); + printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd); + + return execute_cmd(cmd); +} + +static inline int add_htb_netem(const char *iface, const uint32_t parent_major, + const uint32_t parent_minor, const uint32_t handle, + const int loss, const int delay) { + char cmd[300]; + + sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, + parent_minor, handle); + printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + if (execute_cmd(cmd)) + printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n"); + + sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms", + iface, parent_major, parent_minor, handle, loss, delay); + printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + return execute_cmd(cmd); +} + static int create_htb_hierarchy(drl_instance_t *instance) { char cmd[300]; int i, j, k; + uint64_t gigabit = 1024 * 1024 * 1024; /* Nuke the hierarchy. */ sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb"); @@ -920,124 +1136,208 @@ static int create_htb_hierarchy(drl_instance_t *instance) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit"); - if (execute_cmd(cmd)) { + + if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/ + /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/ if (limiter.nodelimit) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit", - (unsigned long) limiter.nodelimit * 1024 * 1024); + if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit)) + return 1; } - if (execute_cmd(cmd)) { - return 1; + /* Add machines. */ + for (i = 0; i < instance->machine_count; ++i) { + if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1, + instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) { + return 1; + } } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + +#define LIMITEXEMPT /* Add back 1:20. */ +#ifdef LIMITEXEMPT + if (instance->last_machine == NULL) { + sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit"); + } else { + sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit", + instance->last_machine->htb_node); + } +#else sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit"); +#endif if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - - /* Add machines. */ - for (i = 0; i < instance->machine_count; ++i) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", - instance->machines[i]->htb_parent, - instance->machines[i]->htb_node, - (unsigned long) instance->machines[i]->limit * 1024 * 1024); - - if (execute_cmd(cmd)) { - return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - } - /* Add sets. */ for (j = (instance->set_count - 1); j >= 0; --j) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", - instance->sets[j]->htb_parent, - instance->sets[j]->htb_node, - (unsigned long) instance->sets[j]->limit * 1024 * 1024); - - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1, + instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) { return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add leaves. FIXME: Set static sliver limit as ceil here! */ for (k = 0; k < instance->leaf_count; ++k) { if (instance->leaves[k].parent == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit", - instance->leaves[k].xid, - (unsigned long) 100 * 1024 * 1024); + if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit", - instance->leaves[k].parent->htb_node, - instance->leaves[k].xid, - (unsigned long) 100 * 1024 * 1024); + if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) + return 1; } - if (execute_cmd(cmd)) { - return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit", - instance->leaves[k].xid); - - if (execute_cmd(cmd)) { + /* Add exempt node for the leaf under 1:20 as 1:2 */ + if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add 1:1000 and 1:2000 */ if (instance->last_machine == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit", - instance->last_machine->htb_node); + if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit)) + return 1; } - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit)) return 1; + + /* Add 1:1fff and 1:2fff */ + if (instance->last_machine == NULL) { + if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit)) + return 1; + } else { + if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit)) + return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit"); - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit)) return 1; + + /* Artifical delay or loss for experimentation. */ + if (netem_delay.u.value || netem_loss.u.value) { + if (!strcmp(netem_slice.u.string, "ALL")) { + /* By default, netem applies to all leaves. */ + if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value)) + return 1; + if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value)) + return 1; + + for (k = 0; k < instance->leaf_count; ++k) { + if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid), + (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) { + return 1; + } + + //FIXME: add exempt delay/loss here on 0x2000 ... ? + } + } else { + /* netem_slice is not the default ALL value. Only apply netem + * to the slice that is set in netem_slice.u.string. */ + uint32_t slice_xid; + + sscanf(netem_slice.u.string, "%x", &slice_xid); + + if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value)) + return 1; + } } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - /* Add 1:1fff and 1:2fff */ - if (instance->last_machine == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit"); - } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit", - instance->last_machine->htb_node); +#if 0 +#ifdef DELAY40MS + /* Only for artificial delay testing. */ + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); + execute_cmd(cmd); + + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); + execute_cmd(cmd); + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo"); + execute_cmd(cmd); + + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms"); + execute_cmd(cmd); + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo"); + execute_cmd(cmd); + + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms"); + execute_cmd(cmd); + /* End delay testing */ +#endif +#endif + + return 0; +} + +static int setup_tc_grd(drl_instance_t *instance) { + int i; + char cmd[300]; + + for (i = 0; i < instance->leaf_count; ++i) { + /* Delete the old pfifo qdisc that might have been there before. */ + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo", + instance->leaves[i].xid, instance->leaves[i].xid); + + if (execute_cmd(cmd)) { + printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n"); + } + + /* Add the netem qdisc. */ +#ifdef DELAY40MS + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms", + instance->leaves[i].xid, instance->leaves[i].xid); +#else + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms", + instance->leaves[i].xid, instance->leaves[i].xid); +#endif + + if (execute_cmd(cmd)) { + return 1; + } } + /* Do the same for 1000 and 1fff. */ + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); + + if (execute_cmd(cmd)) { + printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n"); + } + + /* Add the netem qdisc. */ +#ifdef DELAY40MS + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); +#else + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms"); +#endif + if (execute_cmd(cmd)) { return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit"); + + sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo"); + + if (execute_cmd(cmd)) { + printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n"); + } + + /* Add the netem qdisc. */ +#ifdef DELAY40MS + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms"); +#else + sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms"); +#endif if (execute_cmd(cmd)) { return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); return 0; } @@ -1097,9 +1397,9 @@ static int init_drl(void) { printlog(LOG_WARN, " POLICY: %s\n",policy.u.string); if (strcasecmp(policy.u.string,"GRD") == 0) { - limiter.policynum = POLICY_GRD; + limiter.policy = POLICY_GRD; } else if (strcasecmp(policy.u.string,"FPS") == 0) { - limiter.policynum = POLICY_FPS; + limiter.policy = POLICY_FPS; } else { printlog(LOG_CRITICAL, "Unknown DRL policy %s, aborting.\n",policy.u.string); @@ -1133,6 +1433,8 @@ static int init_drl(void) { if (parse_drl_config(drl_configfile.u.string, &configs)) { /* Parse error occured. Return non-zero to notify init_drl(). */ + printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n", + drl_configfile.u.string); return false; } @@ -1140,11 +1442,14 @@ static int init_drl(void) { if (validate_configs(configs, &limiter.stable_instance)) { /* Clean up everything. */ free_failed_config(configs, &limiter.stable_instance); + printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n", + drl_configfile.u.string); return false; } if (init_identities(configs, &limiter.stable_instance)) { free_failed_config(configs, &limiter.stable_instance); + printlog(LOG_CRITICAL, "Failed to initialize identities.\n"); return false; } @@ -1155,16 +1460,35 @@ static int init_drl(void) { /* Debugging - FIXME: remove this? */ print_instance(&limiter.stable_instance); - if (assign_htb_hierarchy(&limiter.stable_instance)) { - free_instance(&limiter.stable_instance); - return false; - } + switch (limiter.policy) { + case POLICY_FPS: + if (assign_htb_hierarchy(&limiter.stable_instance)) { + free_instance(&limiter.stable_instance); + printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n"); + return false; + } + + if (create_htb_hierarchy(&limiter.stable_instance)) { + free_instance(&limiter.stable_instance); + printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n"); + return false; + } + break; + + case POLICY_GRD: + if (setup_tc_grd(&limiter.stable_instance)) { + free_instance(&limiter.stable_instance); + printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n"); + return false; + } + break; - if (create_htb_hierarchy(&limiter.stable_instance)) { - free_instance(&limiter.stable_instance); + default: return false; } + partition_set = partition.u.value; + pthread_rwlock_unlock(&limiter.limiter_lock); if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) { @@ -1204,9 +1528,6 @@ static void reconfig() { return; } - /* Lock */ - pthread_rwlock_wrlock(&limiter.limiter_lock); - if (validate_configs(configs, &limiter.new_instance)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Validation failed during reconfig().\n"); @@ -1226,28 +1547,56 @@ static void reconfig() { /* Debugging - FIXME: remove this? */ print_instance(&limiter.new_instance); + + /* Lock */ + pthread_rwlock_wrlock(&limiter.limiter_lock); - if (assign_htb_hierarchy(&limiter.new_instance)) { - free_instance(&limiter.new_instance); - printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n"); - pthread_rwlock_unlock(&limiter.limiter_lock); - return; - } + switch (limiter.policy) { + case POLICY_FPS: + if (assign_htb_hierarchy(&limiter.new_instance)) { + free_instance(&limiter.new_instance); + printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n"); + pthread_rwlock_unlock(&limiter.limiter_lock); + return; + } - if (create_htb_hierarchy(&limiter.new_instance)) { - free_instance(&limiter.new_instance); - printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n"); + if (create_htb_hierarchy(&limiter.new_instance)) { + free_instance(&limiter.new_instance); + printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n"); + + /* Re-create old instance. */ + if (create_htb_hierarchy(&limiter.stable_instance)) { + /* Error reinstating the old one - big problem. */ + printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n"); + printlog(LOG_CRITICAL, "Giving up...\n"); + flushlog(); + exit(EXIT_FAILURE); + } - /* Re-create old instance. */ - if (create_htb_hierarchy(&limiter.stable_instance)) { - /* Error reinstating the old one - big problem. */ - printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n"); - flushlog(); - exit(EXIT_FAILURE); - } + pthread_rwlock_unlock(&limiter.limiter_lock); + return; + } + break; + + case POLICY_GRD: + if (setup_tc_grd(&limiter.new_instance)) { + free_instance(&limiter.new_instance); + printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n"); + + /* Try to re-create old instance. */ + if (setup_tc_grd(&limiter.stable_instance)) { + printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n"); + printlog(LOG_CRITICAL, "Giving up...\n"); + flushlog(); + exit(EXIT_FAILURE); + } + } + break; - pthread_rwlock_unlock(&limiter.limiter_lock); - return; + default: + /* Should be impossible. */ + printf("Pigs are flying?\n"); + exit(EXIT_FAILURE); } /* Switch over new to stable instance. */ @@ -1284,6 +1633,33 @@ static void time_reconfig(int iterations) { // Seems to take about 85ms / iteration } +static int stop_enforcement(drl_instance_t *instance) { + char cmd[300]; + int i; + + for (i = 0; i < instance->machine_count; ++i) { + sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit", + instance->machines[i]->htb_parent, + instance->machines[i]->htb_node); + + if (execute_cmd(cmd)) { + return 1; + } + } + + for (i = 0; i < instance->set_count; ++i) { + sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit", + instance->sets[i]->htb_parent, + instance->sets[i]->htb_node); + + if (execute_cmd(cmd)) { + return 1; + } + } + + return 0; +} + static void *signal_thread_func(void *args) { int sig; int err; @@ -1291,11 +1667,15 @@ static void *signal_thread_func(void *args) { sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); + sigaddset(&sigs, SIGUSR1); + sigaddset(&sigs, SIGUSR2); pthread_sigmask(SIG_BLOCK, &sigs, NULL); while (1) { sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); + sigaddset(&sigs, SIGUSR1); + sigaddset(&sigs, SIGUSR2); err = sigwait(&sigs, &sig); @@ -1308,11 +1688,26 @@ static void *signal_thread_func(void *args) { case SIGHUP: printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n"); reconfig(); - //time_reconfig(1000); //instrumentation + //time_reconfig(1000); /* instrumentation */ flushlog(); break; + case SIGUSR1: + pthread_rwlock_wrlock(&limiter.limiter_lock); + if (do_enforcement) { + do_enforcement = 0; + stop_enforcement(&limiter.stable_instance); + printlog(LOG_CRITICAL, "--Switching enforcement off.--\n"); + } else { + do_enforcement = 1; + printlog(LOG_CRITICAL, "--Switching enforcement on.--\n"); + } + pthread_rwlock_unlock(&limiter.limiter_lock); + break; + case SIGUSR2: + do_partition = !do_partition; + break; default: - /* Should be impossible... */ + /* Intentionally blank. */ break; } } @@ -1327,6 +1722,8 @@ static void _drl_reg_op(void) sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); + sigaddset(&signal_mask, SIGUSR1); + sigaddset(&signal_mask, SIGUSR2); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) { @@ -1350,8 +1747,7 @@ static void _drl_reg_op(void) * see estimate.c */ if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) { - ulogd_log(ULOGD_ERROR, "couldn't start estimate thread for 0x%x %s\n",limiter.localaddr, - limiter.ip); + printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n"); fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); exit(EXIT_FAILURE); }