X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=drl%2Fdrl_state.c;fp=drl%2Fdrl_state.c;h=8583d0d4edf3f23339f83cebc599cf7ce9b95896;hb=0be9704d6b24d09ebd55beedec52758cb88c570b;hp=0000000000000000000000000000000000000000;hpb=6747e89080a8265aa73320bd9f40a0fa6e1c161e;p=distributedratelimiting.git diff --git a/drl/drl_state.c b/drl/drl_state.c new file mode 100644 index 0000000..8583d0d --- /dev/null +++ b/drl/drl_state.c @@ -0,0 +1,223 @@ +/* See the DRL-LICENSE file for this file's software license. */ + +/** Allows us to use pthread_rwlocks. */ +#define _XOPEN_SOURCE 600 + +/* malloc(), NULL */ +#include + +/* getpid() */ +#include + +/* Socket functions. */ +#include + +/* memset() */ +#include + +/* perror() */ +#include + +/* FD_ZERO() */ +#include + +#include + +#include "raterouter.h" +#include "ratetypes.h" +#include "drl_state.h" +#include "peer_comm.h" +#include "logging.h" + +extern limiter_t limiter; + +int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { + int i; + + memset(comm, 0, sizeof(comm_t)); + + comm->comm_fabric = config->commfabric; + comm->transport_proto = UDP; + comm->remote_node_count = config->peer_count; + comm->gossip.gossip_branch = config->branch; + comm->gossip.weight = 1.0; + + pthread_mutex_init(&comm->lock, NULL); + + /* Set send function. */ + switch (config->commfabric) { + case COMM_MESH: + comm->send_function = send_udp_mesh; + break; + case COMM_GOSSIP: + comm->send_function = send_udp_gossip; + break; + } + + comm->remote_node_map = allocate_map(); + if (comm->remote_node_map == NULL) { + pthread_mutex_destroy(&comm->lock); + return ENOMEM; + } + + /* Allocate remote_limiters array and fill it in. Add remotes to map. */ + comm->remote_limiters = + malloc(config->peer_count * sizeof(remote_limiter_t)); + + if (comm->remote_limiters == NULL) { + pthread_mutex_destroy(&comm->lock); + free_map(comm->remote_node_map, 0); + return ENOMEM; + } + + memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t)); + + for (i = 0; i < config->peer_count; ++i) { + comm->remote_limiters[i].addr = remote_nodes[i].addr; + comm->remote_limiters[i].port = remote_nodes[i].port; + comm->remote_limiters[i].outgoing.next_seqno = 1; + map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]), + sizeof(remote_node_t), &comm->remote_limiters[i]); + } + + /* Allocate and initialize retrys. */ + comm->retrys = malloc(config->branch * sizeof(int)); + if (comm->retrys == NULL) { + pthread_mutex_destroy(&comm->lock); + free_map(comm->remote_node_map, 0); + free(comm->remote_limiters); + return ENOMEM; + } + + for (i = 0; i < config->branch; ++i) { + comm->retrys[i] = -1; + } + + return 0; +} + +void free_comm(comm_t *comm) { + if (comm) { + if (comm->remote_limiters) { + free(comm->remote_limiters); + } + + if (comm->remote_nodes) { + free(comm->remote_nodes); + } + + if (comm->remote_node_map) { + free_map(comm->remote_node_map, 0); + } + + pthread_mutex_destroy(&comm->lock); + + if (comm->retrys) { + free(comm->retrys); + } + } +} + +int read_comm(comm_t *comm, double *aggregate) { + remote_limiter_t *remote; + + pthread_mutex_lock(&comm->lock); + if (comm->comm_fabric == COMM_MESH) { + *aggregate = 0; + map_reset_iterate(comm->remote_node_map); + while ((remote = map_next(comm->remote_node_map))) { + /* remote->rate corresponds to the rate (GRD) or weight (FPS) + * in generated by the peer remote. */ + *aggregate += remote->rate; + + /* If we continue to read it without having heard an update, + * we start to decay its value. */ + if (remote->awol >= REMOTE_AWOL_THRESHOLD) { + remote->rate = remote->rate / 2; + } else { + remote->awol++; + } + } + *aggregate += comm->local_rate; + } else if (comm->comm_fabric == COMM_GOSSIP) { + double value = 0; + value = (comm->gossip.value / comm->gossip.weight); + value *= (comm->remote_node_count + 1); + + /* Keep around the last value so that we don't stupidly pick 0 when + * we're negative. If we pick 0, it looks to the limiter like it + * has free reign and it will take 100% of the rate allocation for + * itself. */ + if (value <= 0) { + //*aggregate = comm->gossip.last_nonzero; + *aggregate = 0; + //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); + } else { + *aggregate = value; + comm->gossip.last_nonzero = *aggregate; + //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); + } + } else { + printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n", + comm->comm_fabric); + pthread_mutex_unlock(&comm->lock); + return EINVAL; + } + pthread_mutex_unlock(&comm->lock); + + //printf("read: %.3f\n", *aggregate); + + return 0; +} + +int write_local_value(comm_t *comm, const double value) { + pthread_mutex_lock(&comm->lock); + if (comm->comm_fabric == COMM_MESH) { + comm->last_local_rate = comm->local_rate; + comm->local_rate = value; + comm->rate_change = comm->local_rate - comm->last_local_rate; + } else if (comm->comm_fabric == COMM_GOSSIP) { + comm->last_local_rate = comm->local_rate; + comm->local_rate = value; + comm->rate_change = comm->local_rate - comm->last_local_rate; + /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/ + /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/ + comm->gossip.value += comm->rate_change; + } + else { + printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n", + comm->comm_fabric); + pthread_mutex_unlock(&comm->lock); + return EINVAL; + } + pthread_mutex_unlock(&comm->lock); + + return 0; +} + +int send_update(comm_t *comm, uint32_t id) { + int result = 0; + + pthread_mutex_lock(&comm->lock); + + result = comm->send_function(comm, id, limiter.udp_socket); + + pthread_mutex_unlock(&comm->lock); + + return result; +} + +void *limiter_receive_thread(void *unused) { + sigset_t signal_mask; + + sigemptyset(&signal_mask); + sigaddset(&signal_mask, SIGHUP); + pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); + + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + while (1) { + limiter_receive(); + } + pthread_exit(NULL); +}