X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Fulogd_DRL.c;h=f785d301b88892a3337278f3a11d72631bf7afbb;hp=14de4fe2f3f0084bff069d2291285b3ba549d82e;hb=f83340496f632165030cc92cd98408a87082f6b1;hpb=74f52acf84cbf11faab8aa53e069464063ce11b9 diff --git a/drl/ulogd_DRL.c b/drl/ulogd_DRL.c index 14de4fe..f785d30 100644 --- a/drl/ulogd_DRL.c +++ b/drl/ulogd_DRL.c @@ -115,8 +115,8 @@ /* DRL specifics */ #include "raterouter.h" #include "util.h" -#include "calendar.h" #include "ratetypes.h" /* needs util and pthread.h */ +#include "calendar.h" #include "logging.h" /* @@ -124,8 +124,40 @@ * Add the config options for DRL. */ -static config_entry_t drl_configfile = { +static config_entry_t partition = { .next = NULL, + .key = "partition_set", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0xfffffff }, +}; + +static config_entry_t netem_slice = { + .next = &partition, + .key = "netem_slice", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + .u = { .string = "ALL" }, +}; + +static config_entry_t netem_loss = { + .next = &netem_slice, + .key = "netem_loss", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0 }, +}; + +static config_entry_t netem_delay = { + .next = &netem_loss, + .key = "netem_delay", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 0 }, +}; + +static config_entry_t drl_configfile = { + .next = &netem_delay, .key = "drl_configfile", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, @@ -231,25 +263,34 @@ extern FILE *logfile; extern uint8_t system_loglevel; extern uint8_t do_enforcement; +/* From peer_comm.c - used to simulate partition. */ +extern int do_partition; +extern int partition_set; + /* functions */ static inline uint32_t -hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port) +hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max) { unsigned char mybytes[FLOWKEYSIZE]; mybytes[0] = protocol; *(uint32_t*)(&(mybytes[1])) = src_ip; *(uint32_t*)(&(mybytes[5])) = dst_ip; *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port; - return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1); + return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1); } uint32_t sampled_hasher(const key_flow *key) { - return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); + /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */ + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX); } uint32_t standard_hasher(const key_flow *key) { - return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE); +} + +uint32_t multiple_hasher(const key_flow *key) { + return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE); } struct intr_id { @@ -418,6 +459,13 @@ static int _output_drl(ulog_iret_t *res) /* Update the identity's table. */ ident->table_sample_function(ident->table, &key); +#ifdef SHADOW_ACCTING + + /* Update the shadow perfect copy of the accounting table. */ + standard_table_sample((standard_flow_table) ident->shadow_table, &key); + +#endif + pthread_mutex_unlock(&ident->table_mutex); ident = ident->parent; @@ -455,6 +503,14 @@ static void free_identity(identity_t *ident) { ident->table_destroy_function(ident->table); } + if (ident->loop_action) { + ident->loop_action->valid = 0; + } + + if (ident->comm_action) { + ident->comm_action->valid = 0; + } + pthread_mutex_destroy(&ident->table_mutex); free(ident); @@ -483,6 +539,8 @@ static void free_instance(drl_instance_t *instance) { free(instance->machines); if (instance->sets) free(instance->sets); + + /* FIXME: Drain the calendar first and free all the entries. */ if (instance->cal) { free(instance->cal); } @@ -523,9 +581,10 @@ static identity_t *new_identity(ident_config *config) { ident->id = config->id; ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0); ident->fixed_ewma_weight = config->fixed_ewma_weight; - ident->intervals = config->intervals; + ident->communication_intervals = config->communication_intervals; + ident->mainloop_intervals = config->mainloop_intervals; ident->ewma_weight = pow(ident->fixed_ewma_weight, - (limiter.estintms/1000.0) * config->intervals); + (limiter.estintms/1000.0) * config->mainloop_intervals); ident->parent = NULL; pthread_mutex_init(&ident->table_mutex, NULL); @@ -546,10 +605,24 @@ static identity_t *new_identity(ident_config *config) { (void (*)(void *)) standard_table_destroy; break; + case ACT_MULTIPLE: + ident->table = + multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common); + + ident->table_sample_function = + (int (*)(void *, const key_flow *)) multiple_table_sample; + ident->table_cleanup_function = + (int (*)(void *)) multiple_table_cleanup; + ident->table_update_function = + (void (*)(void *, struct timeval, double)) multiple_table_update_flows; + ident->table_destroy_function = + (void (*)(void *)) multiple_table_destroy; + break; + case ACT_SAMPLEHOLD: ident->table = sampled_table_create(sampled_hasher, ident->limit * IDENT_CLEAN_INTERVAL, - 1, 20, &ident->common); + SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) sampled_table_sample; @@ -575,6 +648,18 @@ static identity_t *new_identity(ident_config *config) { break; } +#ifdef SHADOW_ACCTING + + ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common); + + if (ident->shadow_table == NULL) { + ident->table_destroy_function(ident->table); + free(ident); + return NULL; + } + +#endif + /* Make sure the table was allocated. */ if (ident->table == NULL) { free(ident); @@ -621,10 +706,11 @@ static int validate_config(ident_config *config) { } /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD, - * ACT_SIMPLE). */ + * ACT_SIMPLE, ACT_MULTIPLE). */ if (config->accounting != ACT_STANDARD && config->accounting != ACT_SAMPLEHOLD && - config->accounting != ACT_SIMPLE) { + config->accounting != ACT_SIMPLE && + config->accounting != ACT_MULTIPLE) { return 1; } @@ -823,12 +909,21 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { /* Allocate and add the machine identities. */ for (i = 0; i < configs.machine_count; ++i) { + identity_action *loop_action; + identity_action *comm_action; instance->machines[i] = new_identity(config); if (instance->machines[i] == NULL) { return ENOMEM; } + loop_action = malloc(sizeof(identity_action)); + comm_action = malloc(sizeof(identity_action)); + + if (loop_action == NULL || comm_action == NULL) { + return ENOMEM; + } + /* The first has no parent - it is the root. All others have the * previous ident as their parent. */ if (i == 0) { @@ -845,8 +940,23 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { config = config->next; + memset(loop_action, 0, sizeof(identity_action)); + memset(comm_action, 0, sizeof(identity_action)); + loop_action->ident = instance->machines[i]; + loop_action->action = ACTION_MAINLOOP; + loop_action->valid = 1; + comm_action->ident = instance->machines[i]; + comm_action->action = ACTION_COMMUNICATE; + comm_action->valid = 1; + + instance->machines[i]->loop_action = loop_action; + instance->machines[i]->comm_action = comm_action; + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), - instance->machines[i], calendar); + loop_action, calendar); + + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), + comm_action, calendar); /* Setup the array of pointers to leaves. This is easy for machines * because a machine node applies to every leaf. */ @@ -874,12 +984,37 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) { /* Sets... */ for (i = 0; i < instance->set_count; ++i) { + identity_action *loop_action; + identity_action *comm_action; + if (instance->sets[i]->parent == NULL) { instance->sets[i]->parent = instance->last_machine; } + loop_action = malloc(sizeof(identity_action)); + comm_action = malloc(sizeof(identity_action)); + + if (loop_action == NULL || comm_action == NULL) { + return ENOMEM; + } + + memset(loop_action, 0, sizeof(identity_action)); + memset(comm_action, 0, sizeof(identity_action)); + loop_action->ident = instance->sets[i]; + loop_action->action = ACTION_MAINLOOP; + loop_action->valid = 1; + comm_action->ident = instance->sets[i]; + comm_action->action = ACTION_COMMUNICATE; + comm_action->valid = 1; + + instance->sets[i]->loop_action = loop_action; + instance->sets[i]->comm_action = comm_action; + + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), + loop_action, calendar); + TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), - instance->sets[i], calendar); + comm_action, calendar); /* Setup the array of pointers to leaves. This is harder for sets, * but this doesn't need to be super-efficient because it happens @@ -915,7 +1050,7 @@ static void print_instance(drl_instance_t *instance) { static int assign_htb_hierarchy(drl_instance_t *instance) { int i, j; - int next_node = 0x11; + int next_node = 0x100; /* Chain machine nodes under 1:10. */ for (i = 0; i < instance->machine_count; ++i) { @@ -952,13 +1087,43 @@ static int assign_htb_hierarchy(drl_instance_t *instance) { /* Added this so that I could comment one line and kill off all of the * command execution. */ -static int execute_cmd(const char *cmd) { +static inline int execute_cmd(const char *cmd) { return system(cmd); } +static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor, + const uint32_t classid_major, const uint32_t classid_minor, + const uint64_t rate, const uint64_t ceil) { + char cmd[300]; + + sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit", + iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil); + printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd); + + return execute_cmd(cmd); +} + +static inline int add_htb_netem(const char *iface, const uint32_t parent_major, + const uint32_t parent_minor, const uint32_t handle, + const int loss, const int delay) { + char cmd[300]; + + sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, + parent_minor, handle); + printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + if (execute_cmd(cmd)) + printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n"); + + sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms", + iface, parent_major, parent_minor, handle, loss, delay); + printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); + return execute_cmd(cmd); +} + static int create_htb_hierarchy(drl_instance_t *instance) { char cmd[300]; int i, j, k; + uint64_t gigabit = 1024 * 1024 * 1024; /* Nuke the hierarchy. */ sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb"); @@ -971,36 +1136,25 @@ static int create_htb_hierarchy(drl_instance_t *instance) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit"); - if (execute_cmd(cmd)) { + + if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/ + /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/ if (limiter.nodelimit) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit", - (unsigned long) limiter.nodelimit * 1024 * 1024); + if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit"); - } - - if (execute_cmd(cmd)) { - return 1; + if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit)) + return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add machines. */ for (i = 0; i < instance->machine_count; ++i) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", - instance->machines[i]->htb_parent, - instance->machines[i]->htb_node, - (unsigned long) instance->machines[i]->limit * 1024 * 1024); - - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1, + instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) { return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } #define LIMITEXEMPT @@ -1024,82 +1178,81 @@ static int create_htb_hierarchy(drl_instance_t *instance) { /* Add sets. */ for (j = (instance->set_count - 1); j >= 0; --j) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", - instance->sets[j]->htb_parent, - instance->sets[j]->htb_node, - (unsigned long) instance->sets[j]->limit * 1024 * 1024); - - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1, + instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) { return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add leaves. FIXME: Set static sliver limit as ceil here! */ for (k = 0; k < instance->leaf_count; ++k) { if (instance->leaves[k].parent == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit", - instance->leaves[k].xid, - (unsigned long) 100 * 1024 * 1024); + if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit", - instance->leaves[k].parent->htb_node, - instance->leaves[k].xid, - (unsigned long) 100 * 1024 * 1024); - } - - if (execute_cmd(cmd)) { - return 1; + if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) + return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit", - instance->leaves[k].xid); - if (execute_cmd(cmd)) { + /* Add exempt node for the leaf under 1:20 as 1:2 */ + if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add 1:1000 and 1:2000 */ if (instance->last_machine == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit", - instance->last_machine->htb_node); - } - - if (execute_cmd(cmd)) { - return 1; + if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit)) + return 1; } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit"); - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add 1:1fff and 1:2fff */ if (instance->last_machine == NULL) { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit"); + if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit)) + return 1; } else { - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit", - instance->last_machine->htb_node); + if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit)) + return 1; } - if (execute_cmd(cmd)) { + if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit)) return 1; - } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); - sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit"); - if (execute_cmd(cmd)) { - return 1; + /* Artifical delay or loss for experimentation. */ + if (netem_delay.u.value || netem_loss.u.value) { + if (!strcmp(netem_slice.u.string, "ALL")) { + /* By default, netem applies to all leaves. */ + if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value)) + return 1; + if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value)) + return 1; + + for (k = 0; k < instance->leaf_count; ++k) { + if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid), + (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) { + return 1; + } + + //FIXME: add exempt delay/loss here on 0x2000 ... ? + } + } else { + /* netem_slice is not the default ALL value. Only apply netem + * to the slice that is set in netem_slice.u.string. */ + uint32_t slice_xid; + + sscanf(netem_slice.u.string, "%x", &slice_xid); + + if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value)) + return 1; + } } - printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); +#if 0 #ifdef DELAY40MS /* Only for artificial delay testing. */ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); @@ -1118,6 +1271,7 @@ static int create_htb_hierarchy(drl_instance_t *instance) { sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms"); execute_cmd(cmd); /* End delay testing */ +#endif #endif return 0; @@ -1333,6 +1487,8 @@ static int init_drl(void) { return false; } + partition_set = partition.u.value; + pthread_rwlock_unlock(&limiter.limiter_lock); if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) { @@ -1512,12 +1668,14 @@ static void *signal_thread_func(void *args) { sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); + sigaddset(&sigs, SIGUSR2); pthread_sigmask(SIG_BLOCK, &sigs, NULL); while (1) { sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); + sigaddset(&sigs, SIGUSR2); err = sigwait(&sigs, &sig); @@ -1545,6 +1703,9 @@ static void *signal_thread_func(void *args) { } pthread_rwlock_unlock(&limiter.limiter_lock); break; + case SIGUSR2: + do_partition = !do_partition; + break; default: /* Intentionally blank. */ break; @@ -1562,6 +1723,7 @@ static void _drl_reg_op(void) sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); + sigaddset(&signal_mask, SIGUSR2); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {