--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+/** Allows us to use pthread_rwlocks. */
+#define _XOPEN_SOURCE 600
+
+/* malloc(), NULL */
+#include <stdlib.h>
+
+/* getpid() */
+#include <unistd.h>
+
+/* Socket functions. */
+#include <sys/socket.h>
+
+/* memset() */
+#include <string.h>
+
+/* perror() */
+#include <errno.h>
+
+/* FD_ZERO() */
+#include <sys/select.h>
+
+#include <assert.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "logging.h"
+
+extern limiter_t limiter;
+
+int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
+ int i;
+
+ memset(comm, 0, sizeof(comm_t));
+
+ comm->comm_fabric = config->commfabric;
+ comm->transport_proto = UDP;
+ comm->remote_node_count = config->peer_count;
+ comm->gossip.gossip_branch = config->branch;
+ comm->gossip.weight = 1.0;
+
+ pthread_mutex_init(&comm->lock, NULL);
+
+ /* Set send function. */
+ switch (config->commfabric) {
+ case COMM_MESH:
+ comm->send_function = send_udp_mesh;
+ break;
+ case COMM_GOSSIP:
+ comm->send_function = send_udp_gossip;
+ break;
+ }
+
+ comm->remote_node_map = allocate_map();
+ if (comm->remote_node_map == NULL) {
+ pthread_mutex_destroy(&comm->lock);
+ return ENOMEM;
+ }
+
+ /* Allocate remote_limiters array and fill it in. Add remotes to map. */
+ comm->remote_limiters =
+ malloc(config->peer_count * sizeof(remote_limiter_t));
+
+ if (comm->remote_limiters == NULL) {
+ pthread_mutex_destroy(&comm->lock);
+ free_map(comm->remote_node_map, 0);
+ return ENOMEM;
+ }
+
+ memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
+
+ for (i = 0; i < config->peer_count; ++i) {
+ comm->remote_limiters[i].addr = remote_nodes[i].addr;
+ comm->remote_limiters[i].port = remote_nodes[i].port;
+ comm->remote_limiters[i].outgoing.next_seqno = 1;
+ map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
+ sizeof(remote_node_t), &comm->remote_limiters[i]);
+ }
+
+ /* Allocate and initialize retrys. */
+ comm->retrys = malloc(config->branch * sizeof(int));
+ if (comm->retrys == NULL) {
+ pthread_mutex_destroy(&comm->lock);
+ free_map(comm->remote_node_map, 0);
+ free(comm->remote_limiters);
+ return ENOMEM;
+ }
+
+ for (i = 0; i < config->branch; ++i) {
+ comm->retrys[i] = -1;
+ }
+
+ return 0;
+}
+
+void free_comm(comm_t *comm) {
+ if (comm) {
+ if (comm->remote_limiters) {
+ free(comm->remote_limiters);
+ }
+
+ if (comm->remote_nodes) {
+ free(comm->remote_nodes);
+ }
+
+ if (comm->remote_node_map) {
+ free_map(comm->remote_node_map, 0);
+ }
+
+ pthread_mutex_destroy(&comm->lock);
+
+ if (comm->retrys) {
+ free(comm->retrys);
+ }
+ }
+}
+
+int read_comm(comm_t *comm, double *aggregate) {
+ remote_limiter_t *remote;
+
+ pthread_mutex_lock(&comm->lock);
+ if (comm->comm_fabric == COMM_MESH) {
+ *aggregate = 0;
+ map_reset_iterate(comm->remote_node_map);
+ while ((remote = map_next(comm->remote_node_map))) {
+ /* remote->rate corresponds to the rate (GRD) or weight (FPS)
+ * in generated by the peer remote. */
+ *aggregate += remote->rate;
+
+ /* If we continue to read it without having heard an update,
+ * we start to decay its value. */
+ if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
+ remote->rate = remote->rate / 2;
+ } else {
+ remote->awol++;
+ }
+ }
+ *aggregate += comm->local_rate;
+ } else if (comm->comm_fabric == COMM_GOSSIP) {
+ double value = 0;
+ value = (comm->gossip.value / comm->gossip.weight);
+ value *= (comm->remote_node_count + 1);
+
+ /* Keep around the last value so that we don't stupidly pick 0 when
+ * we're negative. If we pick 0, it looks to the limiter like it
+ * has free reign and it will take 100% of the rate allocation for
+ * itself. */
+ if (value <= 0) {
+ //*aggregate = comm->gossip.last_nonzero;
+ *aggregate = 0;
+ //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
+ } else {
+ *aggregate = value;
+ comm->gossip.last_nonzero = *aggregate;
+ //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
+ }
+ } else {
+ printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
+ comm->comm_fabric);
+ pthread_mutex_unlock(&comm->lock);
+ return EINVAL;
+ }
+ pthread_mutex_unlock(&comm->lock);
+
+ //printf("read: %.3f\n", *aggregate);
+
+ return 0;
+}
+
+int write_local_value(comm_t *comm, const double value) {
+ pthread_mutex_lock(&comm->lock);
+ if (comm->comm_fabric == COMM_MESH) {
+ comm->last_local_rate = comm->local_rate;
+ comm->local_rate = value;
+ comm->rate_change = comm->local_rate - comm->last_local_rate;
+ } else if (comm->comm_fabric == COMM_GOSSIP) {
+ comm->last_local_rate = comm->local_rate;
+ comm->local_rate = value;
+ comm->rate_change = comm->local_rate - comm->last_local_rate;
+ /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
+ /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
+ comm->gossip.value += comm->rate_change;
+ }
+ else {
+ printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
+ comm->comm_fabric);
+ pthread_mutex_unlock(&comm->lock);
+ return EINVAL;
+ }
+ pthread_mutex_unlock(&comm->lock);
+
+ return 0;
+}
+
+int send_update(comm_t *comm, uint32_t id) {
+ int result = 0;
+
+ pthread_mutex_lock(&comm->lock);
+
+ result = comm->send_function(comm, id, limiter.udp_socket);
+
+ pthread_mutex_unlock(&comm->lock);
+
+ return result;
+}
+
+void *limiter_receive_thread(void *unused) {
+ sigset_t signal_mask;
+
+ sigemptyset(&signal_mask);
+ sigaddset(&signal_mask, SIGHUP);
+ pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
+
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ while (1) {
+ limiter_receive();
+ }
+ pthread_exit(NULL);
+}