X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Fdrl_state.c;h=d0baa4ae6e6314875a0cdd51a5dcfa2345793ac2;hp=8583d0d4edf3f23339f83cebc599cf7ce9b95896;hb=8675c0b77ad3e361f4255ce61881a79061c5238d;hpb=0be9704d6b24d09ebd55beedec52758cb88c570b diff --git a/drl/drl_state.c b/drl/drl_state.c index 8583d0d..d0baa4a 100644 --- a/drl/drl_state.c +++ b/drl/drl_state.c @@ -27,12 +27,61 @@ #include "ratetypes.h" #include "drl_state.h" #include "peer_comm.h" +#include "swim.h" #include "logging.h" +#ifdef BUILD_ZOOKEEPER + #include "zk_drl.h" +#endif + extern limiter_t limiter; +static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) { + switch (comm->gossip.membership) { + case SWIM: + return swim_init(comm, id); + break; + +#ifdef BUILD_ZOOKEEPER + + case ZOOKEEPER: + return zk_drl_init(comm, id, &limiter, config); + break; + +#endif + + default: + printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n"); + return EINVAL; + } +} + +static void group_membership_teardown(comm_t *comm) { + switch (comm->gossip.membership) { + case SWIM: + swim_teardown(comm); + break; + +#ifdef BUILD_ZOOKEEPER + + case ZOOKEEPER: + zk_drl_close(comm); + break; + +#endif + + default: + printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n"); + } +} + +void null_restart_function(comm_t *comm, int32_t view_number) { + /* Intentionally empty. */ +} + int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { int i; + int result = 0; memset(comm, 0, sizeof(comm_t)); @@ -40,17 +89,32 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { comm->transport_proto = UDP; comm->remote_node_count = config->peer_count; comm->gossip.gossip_branch = config->branch; + comm->gossip.membership = config->membership; + comm->gossip.failure_behavior = config->failure_behavior; comm->gossip.weight = 1.0; pthread_mutex_init(&comm->lock, NULL); - - /* Set send function. */ + + // allocate memory to the indices + comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count); + memset(comm->indices, 0, sizeof(int)*comm->remote_node_count); + for(i = 0; i < comm->remote_node_count; i++) + comm->indices[i] = i; + comm->shuffle_index = comm->remote_node_count; + + /* Set default comm function pointers. These may get overwritten later + * by more specific initialization routines such as group membership + * init calls. */ switch (config->commfabric) { case COMM_MESH: comm->send_function = send_udp_mesh; + comm->recv_function = recv_mesh; + comm->restart_function = null_restart_function; break; case COMM_GOSSIP: comm->send_function = send_udp_gossip; + comm->recv_function = recv_gossip; + comm->restart_function = null_restart_function; break; } @@ -61,8 +125,7 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { } /* Allocate remote_limiters array and fill it in. Add remotes to map. */ - comm->remote_limiters = - malloc(config->peer_count * sizeof(remote_limiter_t)); + comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t)); if (comm->remote_limiters == NULL) { pthread_mutex_destroy(&comm->lock); @@ -76,13 +139,18 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { comm->remote_limiters[i].addr = remote_nodes[i].addr; comm->remote_limiters[i].port = remote_nodes[i].port; comm->remote_limiters[i].outgoing.next_seqno = 1; + comm->remote_limiters[i].reachability = REACHABLE; + comm->remote_limiters[i].awol = 0; + comm->remote_limiters[i].count_rounds = 0; + comm->remote_limiters[i].count_awol = 0; + comm->remote_limiters[i].count_alive = 0; map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]), sizeof(remote_node_t), &comm->remote_limiters[i]); } - - /* Allocate and initialize retrys. */ - comm->retrys = malloc(config->branch * sizeof(int)); - if (comm->retrys == NULL) { + + /* Allocate and initialize selected. */ + comm->selected = malloc(config->branch * sizeof(int)); + if (comm->selected == NULL) { pthread_mutex_destroy(&comm->lock); free_map(comm->remote_node_map, 0); free(comm->remote_limiters); @@ -90,14 +158,28 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { } for (i = 0; i < config->branch; ++i) { - comm->retrys[i] = -1; + comm->selected[i] = -1; } - return 0; + if (config->commfabric == COMM_GOSSIP) { + result = group_membership_init(comm, config->id, config); + if (result) { + pthread_mutex_destroy(&comm->lock); + free_map(comm->remote_node_map, 0); + free(comm->remote_limiters); + free(comm->selected); + } + } + + return result; } void free_comm(comm_t *comm) { if (comm) { + if (comm->comm_fabric == COMM_GOSSIP) { + group_membership_teardown(comm); + } + if (comm->remote_limiters) { free(comm->remote_limiters); } @@ -112,51 +194,76 @@ void free_comm(comm_t *comm) { pthread_mutex_destroy(&comm->lock); - if (comm->retrys) { - free(comm->retrys); + if (comm->selected) { + free(comm->selected); } } } -int read_comm(comm_t *comm, double *aggregate) { +int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) { remote_limiter_t *remote; pthread_mutex_lock(&comm->lock); if (comm->comm_fabric == COMM_MESH) { *aggregate = 0; + *effective_global = global_limit; map_reset_iterate(comm->remote_node_map); while ((remote = map_next(comm->remote_node_map))) { - /* remote->rate corresponds to the rate (GRD) or weight (FPS) - * in generated by the peer remote. */ - *aggregate += remote->rate; - - /* If we continue to read it without having heard an update, - * we start to decay its value. */ - if (remote->awol >= REMOTE_AWOL_THRESHOLD) { - remote->rate = remote->rate / 2; + if (remote->reachability != REACHABLE) { + printlog(LOG_WARN, "AWOL remote limiter detected.\n"); + *effective_global -= (global_limit / (comm->remote_node_count + 1)); } else { - remote->awol++; + /* remote->rate corresponds to the rate (GRD) or weight (FPS) + * in generated by the peer remote. */ + *aggregate += remote->rate; } } *aggregate += comm->local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { double value = 0; + int i; value = (comm->gossip.value / comm->gossip.weight); value *= (comm->remote_node_count + 1); - /* Keep around the last value so that we don't stupidly pick 0 when - * we're negative. If we pick 0, it looks to the limiter like it - * has free reign and it will take 100% of the rate allocation for - * itself. */ - if (value <= 0) { - //*aggregate = comm->gossip.last_nonzero; - *aggregate = 0; - //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); - } else { - *aggregate = value; - comm->gossip.last_nonzero = *aggregate; - //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); + /* Look up the failure handling policy and check to see if it is + * is currently relevant. */ + if (comm->gossip.failure_behavior == PANIC) { + int panic = 0; + if (!comm->connected) { + panic = 1; + } + + for (i = 0; i < comm->remote_node_count; ++i) { + if (comm->remote_limiters[i].reachability != REACHABLE) { + panic = 1; + } + } + + if (panic) { + printlog(LOG_DEBUG, "GOSSIP: Panicking!\n"); + *aggregate = comm->local_rate; + *effective_global = (global_limit / (comm->remote_node_count + 1)); + } else { + *aggregate = (value > 0) ? value : 0; + *effective_global = global_limit; + } + } else if (comm->gossip.failure_behavior == QUORUM) { + *effective_global = global_limit; + if (comm->connected) { + for (i = 0; i < comm->remote_node_count; ++i) { + if (comm->remote_limiters[i].reachability != REACHABLE) { + *effective_global -= (global_limit / (comm->remote_node_count + 1)); + } + } + *aggregate = (value > 0) ? value : 0; + } else { + /* Not part of the Quorum - do 1/n. */ + printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n"); + *aggregate = comm->local_rate; + *effective_global = (global_limit / (comm->remote_node_count + 1)); + } } + printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate); } else { printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n", comm->comm_fabric); @@ -165,8 +272,6 @@ int read_comm(comm_t *comm, double *aggregate) { } pthread_mutex_unlock(&comm->lock); - //printf("read: %.3f\n", *aggregate); - return 0; } @@ -175,14 +280,11 @@ int write_local_value(comm_t *comm, const double value) { if (comm->comm_fabric == COMM_MESH) { comm->last_local_rate = comm->local_rate; comm->local_rate = value; - comm->rate_change = comm->local_rate - comm->last_local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { comm->last_local_rate = comm->local_rate; comm->local_rate = value; - comm->rate_change = comm->local_rate - comm->last_local_rate; - /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/ - /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/ - comm->gossip.value += comm->rate_change; + comm->gossip.value += (comm->local_rate - comm->last_local_rate); + printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value); } else { printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n", @@ -208,10 +310,13 @@ int send_update(comm_t *comm, uint32_t id) { } void *limiter_receive_thread(void *unused) { + printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n"); sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); + sigaddset(&signal_mask, SIGUSR1); + sigaddset(&signal_mask, SIGUSR2); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);