/* See the DRL-LICENSE file for this file's software license. */ /** Allows us to use pthread_rwlocks. */ #define _XOPEN_SOURCE 600 /* malloc(), NULL */ #include /* getpid() */ #include /* Socket functions. */ #include /* memset() */ #include /* perror() */ #include /* FD_ZERO() */ #include #include #include "raterouter.h" #include "ratetypes.h" #include "drl_state.h" #include "peer_comm.h" #include "swim.h" #include "logging.h" #ifdef BUILD_ZOOKEEPER #include "zk_drl.h" #endif extern limiter_t limiter; static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) { switch (comm->gossip.membership) { case SWIM: return swim_init(comm, id); break; #ifdef BUILD_ZOOKEEPER case ZOOKEEPER: return zk_drl_init(comm, id, &limiter, config); break; #endif default: printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n"); return EINVAL; } } static void group_membership_teardown(comm_t *comm) { switch (comm->gossip.membership) { case SWIM: swim_teardown(comm); break; #ifdef BUILD_ZOOKEEPER case ZOOKEEPER: zk_drl_close(comm); break; #endif default: printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n"); } } void null_restart_function(comm_t *comm, int32_t view_number) { /* Intentionally empty. */ } int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { int i; int result = 0; memset(comm, 0, sizeof(comm_t)); comm->comm_fabric = config->commfabric; comm->transport_proto = UDP; comm->remote_node_count = config->peer_count; comm->gossip.gossip_branch = config->branch; comm->gossip.membership = config->membership; comm->gossip.failure_behavior = config->failure_behavior; comm->gossip.weight = 1.0; pthread_mutex_init(&comm->lock, NULL); // allocate memory to the indices comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count); memset(comm->indices, 0, sizeof(int)*comm->remote_node_count); for(i = 0; i < comm->remote_node_count; i++) comm->indices[i] = i; comm->shuffle_index = comm->remote_node_count; /* Set default comm function pointers. These may get overwritten later * by more specific initialization routines such as group membership * init calls. */ switch (config->commfabric) { case COMM_MESH: comm->send_function = send_udp_mesh; comm->recv_function = recv_mesh; comm->restart_function = null_restart_function; break; case COMM_GOSSIP: comm->send_function = send_udp_gossip; comm->recv_function = recv_gossip; comm->restart_function = null_restart_function; break; } comm->remote_node_map = allocate_map(); if (comm->remote_node_map == NULL) { pthread_mutex_destroy(&comm->lock); return ENOMEM; } /* Allocate remote_limiters array and fill it in. Add remotes to map. */ comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t)); if (comm->remote_limiters == NULL) { pthread_mutex_destroy(&comm->lock); free_map(comm->remote_node_map, 0); return ENOMEM; } memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t)); for (i = 0; i < config->peer_count; ++i) { comm->remote_limiters[i].addr = remote_nodes[i].addr; comm->remote_limiters[i].port = remote_nodes[i].port; comm->remote_limiters[i].outgoing.next_seqno = 1; comm->remote_limiters[i].reachability = REACHABLE; comm->remote_limiters[i].awol = 0; comm->remote_limiters[i].count_rounds = 0; comm->remote_limiters[i].count_awol = 0; comm->remote_limiters[i].count_alive = 0; map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]), sizeof(remote_node_t), &comm->remote_limiters[i]); } /* Allocate and initialize selected. */ comm->selected = malloc(config->branch * sizeof(int)); if (comm->selected == NULL) { pthread_mutex_destroy(&comm->lock); free_map(comm->remote_node_map, 0); free(comm->remote_limiters); return ENOMEM; } for (i = 0; i < config->branch; ++i) { comm->selected[i] = -1; } if (config->commfabric == COMM_GOSSIP) { result = group_membership_init(comm, config->id, config); if (result) { pthread_mutex_destroy(&comm->lock); free_map(comm->remote_node_map, 0); free(comm->remote_limiters); free(comm->selected); } } return result; } void free_comm(comm_t *comm) { if (comm) { if (comm->comm_fabric == COMM_GOSSIP) { group_membership_teardown(comm); } if (comm->remote_limiters) { free(comm->remote_limiters); } if (comm->remote_nodes) { free(comm->remote_nodes); } if (comm->remote_node_map) { free_map(comm->remote_node_map, 0); } pthread_mutex_destroy(&comm->lock); if (comm->selected) { free(comm->selected); } } } int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) { remote_limiter_t *remote; pthread_mutex_lock(&comm->lock); if (comm->comm_fabric == COMM_MESH) { *aggregate = 0; *effective_global = global_limit; map_reset_iterate(comm->remote_node_map); while ((remote = map_next(comm->remote_node_map))) { if (remote->reachability != REACHABLE) { printlog(LOG_WARN, "AWOL remote limiter detected.\n"); *effective_global -= (global_limit / (comm->remote_node_count + 1)); } else { /* remote->rate corresponds to the rate (GRD) or weight (FPS) * in generated by the peer remote. */ *aggregate += remote->rate; } } *aggregate += comm->local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { double value = 0; int i; value = (comm->gossip.value / comm->gossip.weight); value *= (comm->remote_node_count + 1); /* Look up the failure handling policy and check to see if it is * is currently relevant. */ if (comm->gossip.failure_behavior == PANIC) { int panic = 0; if (!comm->connected) { panic = 1; } for (i = 0; i < comm->remote_node_count; ++i) { if (comm->remote_limiters[i].reachability != REACHABLE) { panic = 1; } } if (panic) { printlog(LOG_DEBUG, "GOSSIP: Panicking!\n"); *aggregate = comm->local_rate; *effective_global = (global_limit / (comm->remote_node_count + 1)); } else { *aggregate = (value > 0) ? value : 0; *effective_global = global_limit; } } else if (comm->gossip.failure_behavior == QUORUM) { *effective_global = global_limit; if (comm->connected) { for (i = 0; i < comm->remote_node_count; ++i) { if (comm->remote_limiters[i].reachability != REACHABLE) { *effective_global -= (global_limit / (comm->remote_node_count + 1)); } } *aggregate = (value > 0) ? value : 0; } else { /* Not part of the Quorum - do 1/n. */ printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n"); *aggregate = comm->local_rate; *effective_global = (global_limit / (comm->remote_node_count + 1)); } } printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate); } else { printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n", comm->comm_fabric); pthread_mutex_unlock(&comm->lock); return EINVAL; } pthread_mutex_unlock(&comm->lock); return 0; } int write_local_value(comm_t *comm, const double value) { pthread_mutex_lock(&comm->lock); if (comm->comm_fabric == COMM_MESH) { comm->last_local_rate = comm->local_rate; comm->local_rate = value; } else if (comm->comm_fabric == COMM_GOSSIP) { comm->last_local_rate = comm->local_rate; comm->local_rate = value; comm->gossip.value += (comm->local_rate - comm->last_local_rate); printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value); } else { printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n", comm->comm_fabric); pthread_mutex_unlock(&comm->lock); return EINVAL; } pthread_mutex_unlock(&comm->lock); return 0; } int send_update(comm_t *comm, uint32_t id) { int result = 0; pthread_mutex_lock(&comm->lock); result = comm->send_function(comm, id, limiter.udp_socket); pthread_mutex_unlock(&comm->lock); return result; } void *limiter_receive_thread(void *unused) { printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n"); sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); sigaddset(&signal_mask, SIGUSR2); sigaddset(&signal_mask, SIGRTMAX); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); while (1) { limiter_receive(); } pthread_exit(NULL); }