X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Fulogd_DRL.c;h=f7b027256901326322647c007087b9982b726cb8;hp=bc82995b243f874cbb31b57fb365361f7e2dd20c;hb=89df43dd6b8cb8df82cfbf395b923014d2826b5a;hpb=5196115d2999ca57181ef592e7907d7afb14c1b3 diff --git a/drl/ulogd_DRL.c b/drl/ulogd_DRL.c index bc82995..f7b0272 100644 --- a/drl/ulogd_DRL.c +++ b/drl/ulogd_DRL.c @@ -115,8 +115,8 @@ /* DRL specifics */ #include "raterouter.h" #include "util.h" -#include "calendar.h" #include "ratetypes.h" /* needs util and pthread.h */ +#include "calendar.h" #include "logging.h" /* @@ -124,8 +124,64 @@ * Add the config options for DRL. */ -static config_entry_t drl_configfile = { +static config_entry_t create_htb = { .next = NULL, + .key = "create_htb", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 1 }, +}; + +static config_entry_t enforce_on = { + .next = &create_htb, + .key = "enforce_on", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 1 }, +}; + +static config_entry_t partition = { + .next = &enforce_on, + .key = "partition_set", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0xfffffff }, +}; + +static config_entry_t sfq_slice = { + .next = &partition, + .key = "sfq_slice", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + .u = { .string = "NONE" }, +}; + +static config_entry_t netem_slice = { + .next = &sfq_slice, + .key = "netem_slice", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + .u = { .string = "ALL" }, +}; + +static config_entry_t netem_loss = { + .next = &netem_slice, + .key = "netem_loss", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0 }, +}; + +static config_entry_t netem_delay = { + .next = &netem_loss, + .key = "netem_delay", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0 }, +}; + +static config_entry_t drl_configfile = { + .next = &netem_delay, .key = "drl_configfile", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, @@ -231,25 +287,34 @@ extern FILE *logfile; extern uint8_t system_loglevel; extern uint8_t do_enforcement; +/* Used to simulate partitions. */ +int do_partition = 0; +int partition_set = 0xfffffff; + /* functions */ static inline uint32_t -hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port) +hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max) { unsigned char mybytes[FLOWKEYSIZE]; mybytes[0] = protocol; *(uint32_t*)(&(mybytes[1])) = src_ip; *(uint32_t*)(&(mybytes[5])) = dst_ip; *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port; - return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1); + return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1); } uint32_t sampled_hasher(const key_flow *key) { - return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); + /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */ + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX); } uint32_t standard_hasher(const key_flow *key) { - return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE); +} + +uint32_t multiple_hasher(const key_flow *key) { + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE); } struct intr_id { @@ -418,6 +483,13 @@ static int _output_drl(ulog_iret_t *res) /* Update the identity's table. */ ident->table_sample_function(ident->table, &key); +#ifdef SHADOW_ACCTING + + /* Update the shadow perfect copy of the accounting table. */ + standard_table_sample((standard_flow_table) ident->shadow_table, &key); + +#endif + pthread_mutex_unlock(&ident->table_mutex); ident = ident->parent; @@ -455,6 +527,14 @@ static void free_identity(identity_t *ident) { ident->table_destroy_function(ident->table); } + if (ident->loop_action) { + ident->loop_action->valid = 0; + } + + if (ident->comm_action) { + ident->comm_action->valid = 0; + } + pthread_mutex_destroy(&ident->table_mutex); free(ident); @@ -483,6 +563,8 @@ static void free_instance(drl_instance_t *instance) { free(instance->machines); if (instance->sets) free(instance->sets); + + /* FIXME: Drain the calendar first and free all the entries. */ if (instance->cal) { free(instance->cal); } @@ -523,10 +605,12 @@ static identity_t *new_identity(ident_config *config) { ident->id = config->id; ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0); ident->fixed_ewma_weight = config->fixed_ewma_weight; - ident->intervals = config->intervals; + ident->communication_intervals = config->communication_intervals; + ident->mainloop_intervals = config->mainloop_intervals; ident->ewma_weight = pow(ident->fixed_ewma_weight, - (limiter.estintms/1000.0) * config->intervals); + (limiter.estintms/1000.0) * config->mainloop_intervals); ident->parent = NULL; + ident->independent = config->independent; pthread_mutex_init(&ident->table_mutex, NULL); switch (config->accounting) { @@ -546,10 +630,24 @@ static identity_t *new_identity(ident_config *config) { (void (*)(void *)) standard_table_destroy; break; + case ACT_MULTIPLE: + ident->table = + multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common); + + ident->table_sample_function = + (int (*)(void *, const key_flow *)) multiple_table_sample; + ident->table_cleanup_function = + (int (*)(void *)) multiple_table_cleanup; + ident->table_update_function = + (void (*)(void *, struct timeval, double)) multiple_table_update_flows; + ident->table_destroy_function = + (void (*)(void *)) multiple_table_destroy; + break; + case ACT_SAMPLEHOLD: ident->table = sampled_table_create(sampled_hasher, ident->limit * IDENT_CLEAN_INTERVAL, - 1, 20, &ident->common); + SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) sampled_table_sample; @@ -575,6 +673,18 @@ static identity_t *new_identity(ident_config *config) { break; } +#ifdef SHADOW_ACCTING + + ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common); + + if (ident->shadow_table == NULL) { + ident->table_destroy_function(ident->table); + free(ident); + return NULL; + } + +#endif + /* Make sure the table was allocated. */ if (ident->table == NULL) { free(ident); @@ -595,9 +705,33 @@ static identity_t *new_identity(ident_config *config) { ident->comm.remote_nodes = comm_nodes; + if (!create_htb.u.value) { + ident->htb_node = config->htb_node; + ident->htb_parent = config->htb_parent; + } + return ident; } +static int validate_htb_exists(int node, int parent) { + FILE *pipe = popen("/sbin/tc class show dev eth0", "r"); + char line[200]; + + while (fgets(line, 200, pipe) != NULL) { + int n, p; + char ignore[200]; + + sscanf(line, "class htb 1:%x parent 1:%x prio %s", &n, &p, ignore); + if (n == node && p == parent) { + pclose(pipe); + return 0; + } + } + + pclose(pipe); + return 1; +} + /* Determines the validity of the parameters of one ident_config. * * 0 valid @@ -621,10 +755,11 @@ static int validate_config(ident_config *config) { } /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD, - * ACT_SIMPLE). */ + * ACT_SIMPLE, ACT_MULTIPLE). */ if (config->accounting != ACT_STANDARD && config->accounting != ACT_SAMPLEHOLD && - config->accounting != ACT_SIMPLE) { + config->accounting != ACT_SIMPLE && + config->accounting != ACT_MULTIPLE) { return 1; } @@ -633,6 +768,22 @@ static int validate_config(ident_config *config) { return 1; } + if (!create_htb.u.value) { + if (config->htb_node < 0 || config->htb_parent < 0) { + printlog(LOG_CRITICAL, "When create_htb is disabled in ulogd.conf, an identity must specify the htb_node and htb_parent propertities in its configuration.\n"); + return 1; + } + + if (validate_htb_exists(config->htb_node, config->htb_parent)) { + printlog(LOG_CRITICAL, "Identity specified htb node %x with parent %x. No such node/parent combo seems to exist!\n", config->htb_node, config->htb_parent); + return 1; + } + } else { + if (config->htb_node > -1 || config->htb_parent > -1) { + printlog(LOG_WARN, "htb_node or htb_parent are configured but ignored because we're configured to create our own htb hierarchy.\n"); + } + } + /* Note: Parsing stage requires that each ident has at least one peer. */ return 0; } @@ -677,11 +828,17 @@ static int validate_configs(parsed_configs configs, drl_instance_t *instance) { return EINVAL; } + if (mlist->independent) { + printlog(LOG_CRITICAL, "Makes no sense to have independent machine node - setting independent to false.\n"); + mlist->independent = 0; + } + mlist = mlist->next; } instance->sets = malloc(configs.set_count * sizeof(identity_t *)); if (instance->sets == NULL) { + printlog(LOG_CRITICAL, "Not enough memory to allocate set identity collection.\n"); return ENOMEM; } @@ -715,6 +872,7 @@ static int validate_configs(parsed_configs configs, drl_instance_t *instance) { instance->sets[i] = new_identity(slist); if (instance->sets[i] == NULL) { + printlog(LOG_CRITICAL, "Not enough memory to allocate set identity.\n"); return ENOMEM; } @@ -731,10 +889,12 @@ static int validate_configs(parsed_configs configs, drl_instance_t *instance) { child_leaf = map_search(instance->leaf_map, &members->value, sizeof(members->value)); if (child_leaf == NULL) { + printlog(LOG_CRITICAL, "xid: child leaf not found.\n"); return EINVAL; } if (child_leaf->parent != NULL) { /* Error - This leaf already has a parent. */ + printlog(LOG_CRITICAL, "xid: child already has a parent.\n"); return EINVAL; } child_leaf->parent = instance->sets[i]; @@ -743,10 +903,12 @@ static int validate_configs(parsed_configs configs, drl_instance_t *instance) { child_ident = map_search(instance->ident_map, &members->value, sizeof(members->value)); if (child_ident == NULL) { + printlog(LOG_CRITICAL, "guid: child identity not found.\n"); return EINVAL; } if (child_ident->parent != NULL) { /* Error - This identity already has a parent. */ + printlog(LOG_CRITICAL, "guid: child identity already has a parent.\n"); return EINVAL; } child_ident->parent = instance->sets[i]; @@ -823,12 +985,21 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { /* Allocate and add the machine identities. */ for (i = 0; i < configs.machine_count; ++i) { + identity_action *loop_action; + identity_action *comm_action; instance->machines[i] = new_identity(config); if (instance->machines[i] == NULL) { return ENOMEM; } + loop_action = malloc(sizeof(identity_action)); + comm_action = malloc(sizeof(identity_action)); + + if (loop_action == NULL || comm_action == NULL) { + return ENOMEM; + } + /* The first has no parent - it is the root. All others have the * previous ident as their parent. */ if (i == 0) { @@ -845,8 +1016,23 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { config = config->next; + memset(loop_action, 0, sizeof(identity_action)); + memset(comm_action, 0, sizeof(identity_action)); + loop_action->ident = instance->machines[i]; + loop_action->action = ACTION_MAINLOOP; + loop_action->valid = 1; + comm_action->ident = instance->machines[i]; + comm_action->action = ACTION_COMMUNICATE; + comm_action->valid = 1; + + instance->machines[i]->loop_action = loop_action; + instance->machines[i]->comm_action = comm_action; + + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), + loop_action, calendar); + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), - instance->machines[i], calendar); + comm_action, calendar); /* Setup the array of pointers to leaves. This is easy for machines * because a machine node applies to every leaf. */ @@ -874,12 +1060,37 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { /* Sets... */ for (i = 0; i < instance->set_count; ++i) { - if (instance->sets[i]->parent == NULL) { + identity_action *loop_action; + identity_action *comm_action; + + if (instance->sets[i]->parent == NULL && instance->sets[i]->independent == 0) { instance->sets[i]->parent = instance->last_machine; } + loop_action = malloc(sizeof(identity_action)); + comm_action = malloc(sizeof(identity_action)); + + if (loop_action == NULL || comm_action == NULL) { + return ENOMEM; + } + + memset(loop_action, 0, sizeof(identity_action)); + memset(comm_action, 0, sizeof(identity_action)); + loop_action->ident = instance->sets[i]; + loop_action->action = ACTION_MAINLOOP; + loop_action->valid = 1; + comm_action->ident = instance->sets[i]; + comm_action->action = ACTION_COMMUNICATE; + comm_action->valid = 1; + + instance->sets[i]->loop_action = loop_action; + instance->sets[i]->comm_action = comm_action; + + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), + loop_action, calendar); + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), - instance->sets[i], calendar); + comm_action, calendar); /* Setup the array of pointers to leaves. This is harder for sets, * but this doesn't need to be super-efficient because it happens @@ -915,7 +1126,15 @@ static void print_instance(drl_instance_t *instance) { static int assign_htb_hierarchy(drl_instance_t *instance) { int i, j; - int next_node = 0x11; + int next_node = 0x100; + + /* If we're not going to create our own htb hierarchy (for instance, + * if we're going to let PL's node manager do it for us), then we don't + * want this function to do anything. */ + if (!create_htb.u.value) { + printlog(LOG_DEBUG, "Skipping assign_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n"); + return 0; + } /* Chain machine nodes under 1:10. */ for (i = 0; i < instance->machine_count; ++i) { @@ -938,6 +1157,7 @@ static int assign_htb_hierarchy(drl_instance_t *instance) { * already there. */ for (j = (instance->set_count - 1); j >= 0; --j) { if (instance->sets[j]->parent == NULL) { + /* Independent node - goes under 0x10 away from machine nodes. */ instance->sets[j]->htb_parent = 0x10; } else { instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node; @@ -952,13 +1172,68 @@ static int assign_htb_hierarchy(drl_instance_t *instance) { /* Added this so that I could comment one line and kill off all of the * command execution. */ -static int execute_cmd(const char *cmd) { +static inline int execute_cmd(const char *cmd) { return system(cmd); } +static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor, + const uint32_t classid_major, const uint32_t classid_minor, + const uint64_t rate, const uint64_t ceil) { + char cmd[300]; + + sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit", + iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil); + printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd); + + return execute_cmd(cmd); +} + +static inline int add_htb_netem(const char *iface, const uint32_t parent_major, + const uint32_t parent_minor, const uint32_t handle, + const int loss, const int delay) { + char cmd[300]; + + sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, + parent_minor, handle); + printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + if (execute_cmd(cmd)) + printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n"); + + sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms", + iface, parent_major, parent_minor, handle, loss, delay); + printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + return execute_cmd(cmd); +} + +static inline int add_htb_sfq(const char *iface, const uint32_t parent_major, + const uint32_t parent_minor, const uint32_t handle, + const int perturb) { + char cmd[300]; + + sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, + parent_minor, handle); + printlog(LOG_WARN, "HTB_cmd: %s\n", cmd); + if (execute_cmd(cmd)) + printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n"); + + sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d", + iface, parent_major, parent_minor, handle, perturb); + printlog(LOG_WARN, "HTB_cmd: %s\n", cmd); + return execute_cmd(cmd); +} + static int create_htb_hierarchy(drl_instance_t *instance) { char cmd[300]; int i, j, k; + uint64_t gigabit = 1024 * 1024 * 1024; + + /* If we're not going to create our own htb hierarchy (for instance, + * if we're going to let PL's node manager do it for us), then we don't + * want this function to do anything. */ + if (!create_htb.u.value) { + printlog(LOG_DEBUG, "Skipping create_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n"); + return 0; + } /* Nuke the hierarchy. */ sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb"); @@ -971,160 +1246,151 @@ static int create_htb_hierarchy(drl_instance_t *instance) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit"); - if (execute_cmd(cmd)) { + + if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/ + /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/ if (limiter.nodelimit) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit", - (unsigned long) limiter.nodelimit * 1024 * 1024); + if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit)) + return 1; } - if (execute_cmd(cmd)) { - return 1; + /* Add machines. */ + for (i = 0; i < instance->machine_count; ++i) { + if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1, + instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) { + return 1; + } } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + +#define LIMITEXEMPT /* Add back 1:20. */ +#ifdef LIMITEXEMPT + if (instance->last_machine == NULL) { + sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit"); + } else { + sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit", + instance->last_machine->htb_node); + } +#else sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit"); +#endif if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - - /* Add machines. */ - for (i = 0; i < instance->machine_count; ++i) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", - instance->machines[i]->htb_parent, - instance->machines[i]->htb_node, - (unsigned long) instance->machines[i]->limit * 1024 * 1024); - - if (execute_cmd(cmd)) { - return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - } - /* Add sets. */ for (j = (instance->set_count - 1); j >= 0; --j) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", - instance->sets[j]->htb_parent, - instance->sets[j]->htb_node, - (unsigned long) instance->sets[j]->limit * 1024 * 1024); - - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1, + instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) { return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add leaves. FIXME: Set static sliver limit as ceil here! */ for (k = 0; k < instance->leaf_count; ++k) { if (instance->leaves[k].parent == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit", - instance->leaves[k].xid, - (unsigned long) 100 * 1024 * 1024); + if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit", - instance->leaves[k].parent->htb_node, - instance->leaves[k].xid, - (unsigned long) 100 * 1024 * 1024); + if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) + return 1; } - if (execute_cmd(cmd)) { + /* Add exempt node for the leaf under 1:20 as 1:2 */ + if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit", - instance->leaves[k].xid); - - if (execute_cmd(cmd)) { - return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add 1:1000 and 1:2000 */ if (instance->last_machine == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit", - instance->last_machine->htb_node); - } - - if (execute_cmd(cmd)) { - return 1; + if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit)) + return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit"); - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add 1:1fff and 1:2fff */ if (instance->last_machine == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit", - instance->last_machine->htb_node); + if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit)) + return 1; } - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit"); - if (execute_cmd(cmd)) { - return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + /* Artifical delay or loss for experimentation. */ + if (netem_delay.u.value || netem_loss.u.value) { + if (!strcmp(netem_slice.u.string, "ALL")) { + /* By default, netem applies to all leaves. */ + if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value)) + return 1; + if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value)) + return 1; + + for (k = 0; k < instance->leaf_count; ++k) { + if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid), + (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) { + return 1; + } -#ifdef DELAY40MS - /* Only for artificial delay testing. */ - sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); - execute_cmd(cmd); + //FIXME: add exempt delay/loss here on 0x2000 ... ? + } + } else { + /* netem_slice is not the default ALL value. Only apply netem + * to the slice that is set in netem_slice.u.string. */ + uint32_t slice_xid; - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); - execute_cmd(cmd); - sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo"); - execute_cmd(cmd); + sscanf(netem_slice.u.string, "%x", &slice_xid); - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms"); - execute_cmd(cmd); - sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo"); - execute_cmd(cmd); + if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value)) + return 1; + } + } - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms"); - execute_cmd(cmd); - /* End delay testing */ -#endif + /* Turn on SFQ for experimentation. */ + if (strcmp(sfq_slice.u.string, "NONE")) { + if (!strcmp(sfq_slice.u.string, "ALL")) { + if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30)) + return 1; + if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30)) + return 1; -//#define SFQTEST + for (k = 0; k < instance->leaf_count; ++k) { + if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid), + (0x1000 | instance->leaves[k].xid), 30)) { + return 1; + } + } + } else { + uint32_t slice_xid; -#ifdef SFQTEST - sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); - execute_cmd(cmd); + sscanf(sfq_slice.u.string, "%x", &slice_xid); - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 sfq perturb 20"); - execute_cmd(cmd); -#endif + if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30)) + return 1; + } + } return 0; } static int setup_tc_grd(drl_instance_t *instance) { - int i; + int i, j; char cmd[300]; for (i = 0; i < instance->leaf_count; ++i) { @@ -1137,15 +1403,11 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms", - instance->leaves[i].xid, instance->leaves[i].xid); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms", instance->leaves[i].xid, instance->leaves[i].xid); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } } @@ -1158,13 +1420,10 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms"); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } @@ -1175,16 +1434,65 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms"); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms"); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } + /* Artifical delay or loss for experimentation. */ + if (netem_delay.u.value || netem_loss.u.value) { + if (!strcmp(netem_slice.u.string, "ALL")) { + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value); + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value); + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + + for (j = 0; j < instance->leaf_count; ++j) { + leaf_t *current = &instance->leaves[j]; + + current->delay = netem_delay.u.value; + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value); + + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + } + } else { + uint32_t slice_xid; + leaf_t *leaf = NULL; + + sscanf(netem_slice.u.string, "%x", &slice_xid); + + leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid)); + + if (leaf == NULL) { + /* Leaf not found - invalid selection. */ + printf("Your experimental setup is incorrect...\n"); + return 1; + } + + leaf->delay = netem_delay.u.value; + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value); + + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + } + } + return 0; } @@ -1218,10 +1526,10 @@ static int init_drl(void) { pthread_rwlock_init(&limiter.limiter_lock,NULL); /* determine our local IP by iterating through interfaces */ - if ((limiter.ip = get_local_ip())==0) { - printlog(LOG_CRITICAL, - "ulogd_DRL unable to aquire local IP address, not registering.\n"); - return (false); + limiter.ip = get_local_ip(); + if (limiter.ip == NULL) { + printlog(LOG_CRITICAL, "ulogd_DRL unable to aquire local IP address, not registering.\n"); + return false; } limiter.localaddr = inet_addr(limiter.ip); limiter.port = htons(LIMITER_LISTEN_PORT); @@ -1303,7 +1611,6 @@ static int init_drl(void) { free_ident_list(configs.machines); free_ident_list(configs.sets); - /* Debugging - FIXME: remove this? */ print_instance(&limiter.stable_instance); switch (limiter.policy) { @@ -1333,6 +1640,8 @@ static int init_drl(void) { return false; } + partition_set = partition.u.value; + pthread_rwlock_unlock(&limiter.limiter_lock); if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) { @@ -1389,7 +1698,6 @@ static void reconfig() { free_ident_list(configs.machines); free_ident_list(configs.sets); - /* Debugging - FIXME: remove this? */ print_instance(&limiter.new_instance); /* Lock */ @@ -1451,32 +1759,6 @@ static void reconfig() { pthread_rwlock_unlock(&limiter.limiter_lock); } -static ulog_output_t drl_op = { - .name = "drl", - .output = &_output_drl, - .signal = NULL, /* This appears to be broken. Using my own handler. */ - .init = NULL, - .fini = NULL, -}; - -/* Tests the amount of time it takes to call reconfig(). */ -static void time_reconfig(int iterations) { - struct timeval start, end; - int i; - - gettimeofday(&start, NULL); - for (i = 0; i < iterations; ++i) { - reconfig(); - } - gettimeofday(&end, NULL); - - printf("%d reconfigs() took %d seconds and %d microseconds.\n", - iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec); - exit(0); - - // Seems to take about 85ms / iteration -} - static int stop_enforcement(drl_instance_t *instance) { char cmd[300]; int i; @@ -1512,26 +1794,35 @@ static void *signal_thread_func(void *args) { sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); + sigaddset(&sigs, SIGUSR2); + sigaddset(&sigs, SIGRTMAX); pthread_sigmask(SIG_BLOCK, &sigs, NULL); while (1) { sigemptyset(&sigs); - sigaddset(&sigs, SIGHUP); + //sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); + sigaddset(&sigs, SIGUSR2); + sigaddset(&sigs, SIGRTMAX); err = sigwait(&sigs, &sig); if (err) { printlog(LOG_CRITICAL, "sigwait() returned an error.\n"); flushlog(); + continue; + } + + if (sig == SIGRTMAX) { + printf("Caught SIGRTMAX - toggling fake partitions.\n"); + do_partition = !do_partition; + continue; } switch (sig) { case SIGHUP: - printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n"); - reconfig(); - //time_reconfig(1000); /* instrumentation */ - flushlog(); + printlog(LOG_CRITICAL, "Caught SIGHUP in signal_thread_func?!?\n"); + printf("Caught SIGHUP in signal_thread_func?!?\n"); break; case SIGUSR1: pthread_rwlock_wrlock(&limiter.limiter_lock); @@ -1545,23 +1836,27 @@ static void *signal_thread_func(void *args) { } pthread_rwlock_unlock(&limiter.limiter_lock); break; + case SIGUSR2: + printlog(LOG_WARN, "Caught SIGUSR2 - re-reading XML file.\n"); + printf("Caught SIGUSR2 - re-reading XML file.\n"); + reconfig(); + flushlog(); + break; default: /* Intentionally blank. */ break; } } - } -/* register output plugin with ulogd */ -static void _drl_reg_op(void) -{ - ulog_output_t *op = &drl_op; +static int drl_plugin_init() { sigset_t signal_mask; sigemptyset(&signal_mask); - sigaddset(&signal_mask, SIGHUP); + //sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); + sigaddset(&signal_mask, SIGUSR2); + sigaddset(&signal_mask, SIGRTMAX); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) { @@ -1578,8 +1873,6 @@ static void _drl_reg_op(void) exit(EXIT_FAILURE); } - register_output(op); - /* start up the thread that will periodically estimate the * local rate and set the local limits * see estimate.c @@ -1589,6 +1882,62 @@ static void _drl_reg_op(void) fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); exit(EXIT_FAILURE); } + + if (enforce_on.u.value) { + pthread_rwlock_wrlock(&limiter.limiter_lock); + do_enforcement = 1; + printlog(LOG_CRITICAL, "--Switching enforcement on.--\n"); + pthread_rwlock_unlock(&limiter.limiter_lock); + } + + return 0; +} + +static void drl_signal(int sig) { + if (sig == SIGHUP) { + printf("Caught SIGHUP - reopening DRL log file.\n"); + + fclose(logfile); + logfile = fopen(drl_logfile.u.string, "a"); + printlog(LOG_CRITICAL, "Reopened logfile.\n"); + } else { + printlog(LOG_WARN, "Caught unexpected signal %d in drl_signal.\n", sig); + } +} + +static ulog_output_t drl_op = { + .name = "drl", + .output = &_output_drl, + .signal = &drl_signal, + .init = &drl_plugin_init, + .fini = NULL, +}; + +#if 0 +/* Tests the amount of time it takes to call reconfig(). */ +static void time_reconfig(int iterations) { + struct timeval start, end; + int i; + + gettimeofday(&start, NULL); + for (i = 0; i < iterations; ++i) { + reconfig(); + } + gettimeofday(&end, NULL); + + printf("%d reconfigs() took %d seconds and %d microseconds.\n", + iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec); + exit(0); + + // Seems to take about 85ms / iteration +} +#endif + +/* register output plugin with ulogd */ +static void _drl_reg_op(void) +{ + ulog_output_t *op = &drl_op; + register_output(op); } void _init(void)