/* See the DRL-LICENSE file for this file's software license. */ /** Allows us to use pthread_rwlocks. */ #define _XOPEN_SOURCE 600 /* malloc(), NULL */ #include /* getpid() */ #include /* Socket functions. */ #include /* memset() */ #include /* perror() */ #include /* FD_ZERO() */ #include #include #include "raterouter.h" #include "ratetypes.h" #include "drl_state.h" #include "peer_comm.h" #include "logging.h" extern limiter_t limiter; int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { int i; memset(comm, 0, sizeof(comm_t)); comm->comm_fabric = config->commfabric; comm->transport_proto = UDP; comm->remote_node_count = config->peer_count; comm->gossip.gossip_branch = config->branch; comm->gossip.weight = 1.0; pthread_mutex_init(&comm->lock, NULL); /* Set send function. */ switch (config->commfabric) { case COMM_MESH: comm->send_function = send_udp_mesh; break; case COMM_GOSSIP: comm->send_function = send_udp_gossip; break; } comm->remote_node_map = allocate_map(); if (comm->remote_node_map == NULL) { pthread_mutex_destroy(&comm->lock); return ENOMEM; } /* Allocate remote_limiters array and fill it in. Add remotes to map. */ comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t)); if (comm->remote_limiters == NULL) { pthread_mutex_destroy(&comm->lock); free_map(comm->remote_node_map, 0); return ENOMEM; } memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t)); for (i = 0; i < config->peer_count; ++i) { comm->remote_limiters[i].addr = remote_nodes[i].addr; comm->remote_limiters[i].port = remote_nodes[i].port; comm->remote_limiters[i].outgoing.next_seqno = 1; map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]), sizeof(remote_node_t), &comm->remote_limiters[i]); } /* Allocate and initialize retrys. */ comm->retrys = malloc(config->branch * sizeof(int)); if (comm->retrys == NULL) { pthread_mutex_destroy(&comm->lock); free_map(comm->remote_node_map, 0); free(comm->remote_limiters); return ENOMEM; } for (i = 0; i < config->branch; ++i) { comm->retrys[i] = -1; } return 0; } void free_comm(comm_t *comm) { if (comm) { if (comm->remote_limiters) { free(comm->remote_limiters); } if (comm->remote_nodes) { free(comm->remote_nodes); } if (comm->remote_node_map) { free_map(comm->remote_node_map, 0); } pthread_mutex_destroy(&comm->lock); if (comm->retrys) { free(comm->retrys); } } } int read_comm(comm_t *comm, double *aggregate, double decayto) { remote_limiter_t *remote; pthread_mutex_lock(&comm->lock); if (comm->comm_fabric == COMM_MESH) { *aggregate = 0; map_reset_iterate(comm->remote_node_map); while ((remote = map_next(comm->remote_node_map))) { /* remote->rate corresponds to the rate (GRD) or weight (FPS) * in generated by the peer remote. */ *aggregate += remote->rate; /* If we continue to read it without having heard an update, * we start to make the peer's value approach decayto, getting * half of the way there each time. */ if (remote->awol >= REMOTE_AWOL_THRESHOLD) { printlog(LOG_WARN, "AWOL remote limiter detected.\n"); remote->rate += ((decayto - remote->rate) / 2); } else { remote->awol++; } } *aggregate += comm->local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { double value = 0; value = (comm->gossip.value / comm->gossip.weight); value *= (comm->remote_node_count + 1); /* Keep around the last value so that we don't stupidly pick 0 when * we're negative. If we pick 0, it looks to the limiter like it * has free reign and it will take 100% of the rate allocation for * itself. */ if (value <= 0) { //*aggregate = comm->gossip.last_nonzero; *aggregate = 0; //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); } else { *aggregate = value; comm->gossip.last_nonzero = *aggregate; //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); } } else { printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n", comm->comm_fabric); pthread_mutex_unlock(&comm->lock); return EINVAL; } pthread_mutex_unlock(&comm->lock); //printf("read: %.3f\n", *aggregate); return 0; } int write_local_value(comm_t *comm, const double value) { pthread_mutex_lock(&comm->lock); if (comm->comm_fabric == COMM_MESH) { comm->last_local_rate = comm->local_rate; comm->local_rate = value; comm->rate_change = comm->local_rate - comm->last_local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { comm->last_local_rate = comm->local_rate; comm->local_rate = value; comm->rate_change = comm->local_rate - comm->last_local_rate; /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/ /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/ comm->gossip.value += comm->rate_change; } else { printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n", comm->comm_fabric); pthread_mutex_unlock(&comm->lock); return EINVAL; } pthread_mutex_unlock(&comm->lock); return 0; } int send_update(comm_t *comm, uint32_t id) { int result = 0; pthread_mutex_lock(&comm->lock); result = comm->send_function(comm, id, limiter.udp_socket); pthread_mutex_unlock(&comm->lock); return result; } void *limiter_receive_thread(void *unused) { sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); while (1) { limiter_receive(); } pthread_exit(NULL); }