Importing all of DRL, including ulogd and all of its files.
[distributedratelimiting.git] / drl / drl_state.c
diff --git a/drl/drl_state.c b/drl/drl_state.c
new file mode 100644 (file)
index 0000000..8583d0d
--- /dev/null
@@ -0,0 +1,223 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+/** Allows us to use pthread_rwlocks. */
+#define _XOPEN_SOURCE 600
+
+/* malloc(), NULL */
+#include <stdlib.h>
+
+/* getpid() */
+#include <unistd.h>
+
+/* Socket functions. */
+#include <sys/socket.h>
+
+/* memset() */
+#include <string.h>
+
+/* perror() */
+#include <errno.h>
+
+/* FD_ZERO() */
+#include <sys/select.h>
+
+#include <assert.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "logging.h"
+
+extern limiter_t limiter;
+
+int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
+    int i;
+
+    memset(comm, 0, sizeof(comm_t));
+
+    comm->comm_fabric = config->commfabric;
+    comm->transport_proto = UDP;
+    comm->remote_node_count = config->peer_count;
+    comm->gossip.gossip_branch = config->branch;
+    comm->gossip.weight = 1.0;
+
+    pthread_mutex_init(&comm->lock, NULL);
+
+    /* Set send function. */
+    switch (config->commfabric) {
+        case COMM_MESH:
+            comm->send_function = send_udp_mesh;
+            break;
+        case COMM_GOSSIP:
+            comm->send_function = send_udp_gossip;
+            break;
+    }
+
+    comm->remote_node_map = allocate_map();
+    if (comm->remote_node_map == NULL) {
+        pthread_mutex_destroy(&comm->lock);
+        return ENOMEM;
+    }
+
+    /* Allocate remote_limiters array and fill it in. Add remotes to map. */
+    comm->remote_limiters =
+                        malloc(config->peer_count * sizeof(remote_limiter_t));
+
+    if (comm->remote_limiters == NULL) {
+        pthread_mutex_destroy(&comm->lock);
+        free_map(comm->remote_node_map, 0);
+        return ENOMEM;
+    }
+
+    memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
+
+    for (i = 0; i < config->peer_count; ++i) {
+        comm->remote_limiters[i].addr = remote_nodes[i].addr;
+        comm->remote_limiters[i].port = remote_nodes[i].port;
+        comm->remote_limiters[i].outgoing.next_seqno = 1;
+        map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
+                   sizeof(remote_node_t), &comm->remote_limiters[i]);
+    }
+
+    /* Allocate and initialize retrys. */
+    comm->retrys = malloc(config->branch * sizeof(int));
+    if (comm->retrys == NULL) {
+        pthread_mutex_destroy(&comm->lock);
+        free_map(comm->remote_node_map, 0);
+        free(comm->remote_limiters);
+        return ENOMEM;
+    }
+
+    for (i = 0; i < config->branch; ++i) {
+        comm->retrys[i] = -1;
+    }
+
+    return 0;
+}
+
+void free_comm(comm_t *comm) {
+    if (comm) {
+        if (comm->remote_limiters) {
+            free(comm->remote_limiters);
+        }
+
+        if (comm->remote_nodes) {
+            free(comm->remote_nodes);
+        }
+
+        if (comm->remote_node_map) {
+            free_map(comm->remote_node_map, 0);
+        }
+
+        pthread_mutex_destroy(&comm->lock);
+
+        if (comm->retrys) {
+            free(comm->retrys);
+        }
+    }
+}
+
+int read_comm(comm_t *comm, double *aggregate) {
+    remote_limiter_t *remote;
+
+    pthread_mutex_lock(&comm->lock);
+    if (comm->comm_fabric == COMM_MESH) {
+        *aggregate = 0;
+        map_reset_iterate(comm->remote_node_map);
+        while ((remote = map_next(comm->remote_node_map))) {
+            /* remote->rate corresponds to the rate (GRD) or weight (FPS)
+             * in generated by the peer remote. */
+            *aggregate += remote->rate;
+
+            /* If we continue to read it without having heard an update,
+             * we start to decay its value. */
+            if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
+                remote->rate = remote->rate / 2;
+            } else {
+                remote->awol++;
+            }
+        }
+        *aggregate += comm->local_rate;
+    } else if (comm->comm_fabric == COMM_GOSSIP) {
+        double value = 0;
+        value = (comm->gossip.value / comm->gossip.weight);
+        value *= (comm->remote_node_count + 1);
+
+        /* Keep around the last value so that we don't stupidly pick 0 when
+         * we're negative.  If we pick 0, it looks to the limiter like it
+         * has free reign and it will take 100% of the rate allocation for
+         * itself. */
+        if (value <= 0) {
+            //*aggregate = comm->gossip.last_nonzero;
+            *aggregate = 0;
+            //printf("*****Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
+        } else {
+            *aggregate = value;
+            comm->gossip.last_nonzero = *aggregate;
+            //printf("Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
+        }
+    } else {
+        printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
+                 comm->comm_fabric);
+        pthread_mutex_unlock(&comm->lock);
+        return EINVAL;
+    }
+    pthread_mutex_unlock(&comm->lock);
+
+    //printf("read: %.3f\n", *aggregate);
+
+    return 0;
+}
+
+int write_local_value(comm_t *comm, const double value) {
+    pthread_mutex_lock(&comm->lock);
+    if (comm->comm_fabric == COMM_MESH) {
+        comm->last_local_rate = comm->local_rate;
+        comm->local_rate = value;
+        comm->rate_change = comm->local_rate - comm->last_local_rate;
+    } else if (comm->comm_fabric == COMM_GOSSIP) {
+        comm->last_local_rate = comm->local_rate;
+        comm->local_rate = value;
+        comm->rate_change = comm->local_rate - comm->last_local_rate;
+        /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
+        /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
+        comm->gossip.value += comm->rate_change;
+    }
+    else {
+        printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
+                 comm->comm_fabric);
+        pthread_mutex_unlock(&comm->lock);
+        return EINVAL;
+    }
+    pthread_mutex_unlock(&comm->lock);
+
+    return 0;
+}
+
+int send_update(comm_t *comm, uint32_t id) {
+    int result = 0;
+
+    pthread_mutex_lock(&comm->lock);
+
+    result = comm->send_function(comm, id, limiter.udp_socket);
+
+    pthread_mutex_unlock(&comm->lock);
+
+    return result;
+}
+
+void *limiter_receive_thread(void *unused) {
+    sigset_t signal_mask;
+
+    sigemptyset(&signal_mask);
+    sigaddset(&signal_mask, SIGHUP);
+    pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
+
+    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+    while (1) {
+        limiter_receive();
+    }
+    pthread_exit(NULL);
+}