Importing all of DRL, including ulogd and all of its files.
[distributedratelimiting.git] / drl / peer_comm.c
diff --git a/drl/peer_comm.c b/drl/peer_comm.c
new file mode 100644 (file)
index 0000000..cb30657
--- /dev/null
@@ -0,0 +1,613 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#define _XOPEN_SOURCE 600
+
+/* Debug output. */
+#include <stdio.h>
+#include <stdlib.h>
+
+/* Socket functions. */
+#include <sys/types.h>
+#include <sys/socket.h>
+
+/* Byte ordering and address structures. */
+#include <arpa/inet.h>
+
+/* memset() */
+#include <string.h>
+
+/* close() & usleep */
+#include <unistd.h>
+
+/* Mutex lock/unlock. */
+#include <pthread.h>
+
+/* perror() */
+#include <errno.h>
+
+/* select() w/ timeout */
+#include <sys/select.h>
+#include <sys/time.h>
+
+/* assert() */
+#include <assert.h>
+
+/* sigaddset(), sigemptyset(), SIGHUP, etc. */
+#include <signal.h>
+
+/* DRL data structures. */
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "logging.h"
+
+extern limiter_t limiter;
+
+static const uint32_t MAGIC_MSG = 0x123123;
+static const uint32_t MAGIC_HELLO = 0x456456;
+static const uint16_t MSG = 1;
+static const uint16_t ACK = 2;
+
+static void message_to_hbo(message_t *msg) {
+    msg->magic = ntohl(msg->magic);
+    msg->ident_id = ntohl(msg->ident_id);
+    msg->seqno = ntohl(msg->seqno);
+    msg->min_seqno = ntohl(msg->min_seqno);
+    msg->type = ntohs(msg->type);
+    /* value is a double */
+    /* weight is a double */
+}
+
+static void message_to_nbo(message_t *msg) {
+    msg->magic = htonl(msg->magic);
+    msg->ident_id = htonl(msg->ident_id);
+    msg->seqno = htonl(msg->seqno);
+    msg->min_seqno = htonl(msg->min_seqno);
+    msg->type = htons(msg->type);
+    /* value is a double */
+    /* weight is a double */
+}
+
+static void hello_to_hbo(hello_t *hello) {
+    hello->magic = ntohl(hello->magic);
+    hello->ident_id = ntohl(hello->ident_id);
+    hello->port = ntohs(hello->port);
+}
+
+static void hello_to_nbo(hello_t *hello) {
+    hello->magic = htonl(hello->magic);
+    hello->ident_id = htonl(hello->ident_id);
+    hello->port = htons(hello->port);
+}
+
+static int is_connected(remote_limiter_t *remote) {
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+
+    if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
+        return 1;
+    else
+        return 0;
+}
+
+static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
+    int result = 0;
+    message_t msg;
+    struct sockaddr_in toaddr;
+
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+
+    toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
+    toaddr.sin_port = remote->port;
+
+    memset(&msg, 0, sizeof(msg));
+    msg.magic = MAGIC_MSG;
+    msg.ident_id = ident->id;
+    msg.type = ACK;
+    msg.seqno = seqno;
+
+    message_to_nbo(&msg);
+
+    if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+        printlog(LOG_WARN, "send_ack: sento failed.\n");
+        result = errno;
+    }
+
+    return result;
+}
+
+void limiter_receive() {
+    struct sockaddr_in fromaddr;
+    remote_node_t sender;
+    socklen_t fromlen = sizeof(fromaddr);
+    identity_t *ident = NULL;
+    remote_limiter_t *remote = NULL;
+    message_t msg;
+
+    if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) {
+        /* recv failed.  Log and continue. */
+        printlog(LOG_WARN, "recv failed to read full message.\n");
+        return;
+    }
+    memset(&sender, 0, sizeof(remote_node_t));
+    sender.addr = fromaddr.sin_addr.s_addr;
+    sender.port = fromaddr.sin_port;
+
+    message_to_hbo(&msg);
+
+    assert(msg.magic == MAGIC_MSG);
+
+#if 0
+    printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n", 
+            msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t));
+#endif
+    pthread_testcancel();
+
+    pthread_rwlock_rdlock(&limiter.limiter_lock);
+
+    ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id,
+                       sizeof(msg.ident_id));
+
+    if (ident == NULL) {
+        printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n");
+        pthread_rwlock_unlock(&limiter.limiter_lock);
+        return;
+    }
+
+    pthread_mutex_lock(&ident->comm.lock);
+
+    remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t));
+
+    if (remote == NULL) {
+        printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n");
+        pthread_mutex_unlock(&ident->comm.lock);
+        pthread_rwlock_unlock(&limiter.limiter_lock);
+        return;
+    }
+
+    switch (ident->comm.comm_fabric) {
+        case COMM_MESH: {
+            /* Use the message's value to be our new GRDrate/FPSweight for the
+             * message's sender. */
+            remote->rate = msg.value;
+
+            /* Reset the AWOL counter to zero since we received an update. */
+            remote->awol = 0;
+        }
+        break;
+
+        case COMM_GOSSIP: {
+            if (msg.type == ACK) {
+                if (msg.seqno == remote->outgoing.next_seqno - 1) {
+                    int i;
+
+                    /* Ack for most recent message.  Clear saved state. */
+                    remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+                    remote->outgoing.saved_value = 0;
+                    remote->outgoing.saved_weight = 0;
+
+                    for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
+                        if (ident->comm.retrys[i] >= 0 &&
+                            remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
+                                //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]);
+                                ident->comm.retrys[i] = -2;
+                        }
+                    }
+                }
+                /* Ignore ack if it isn't for most recent message. */
+            } else {
+                if (msg.min_seqno > remote->incoming.seen_seqno) {
+                    /* Entirely new information */
+                    remote->incoming.seen_seqno = msg.seqno;
+                    remote->incoming.saved_value = msg.value;
+                    remote->incoming.saved_weight = msg.weight;
+                    ident->comm.gossip.value += msg.value;
+                    ident->comm.gossip.weight += msg.weight;
+                    send_ack(ident, remote, msg.seqno);
+                } else if (msg.seqno > remote->incoming.seen_seqno) {
+                    /* Only some of the message is old news. */
+                    double diff_value = msg.value - remote->incoming.saved_value;
+                    double diff_weight = msg.weight - remote->incoming.saved_weight;
+
+                    remote->incoming.seen_seqno = msg.seqno;
+                    remote->incoming.saved_value = msg.value;
+                    remote->incoming.saved_weight = msg.weight;
+
+                    ident->comm.gossip.value += diff_value;
+                    ident->comm.gossip.weight += diff_weight;
+                    send_ack(ident, remote, msg.seqno);
+                } else {
+                    /* The entire message is old news. (Duplicate). */
+                    /* Do nothing. */
+                }
+            }
+        }
+        break;
+
+        default: {
+            printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
+        }
+    }
+
+    pthread_mutex_unlock(&ident->comm.lock);
+    pthread_rwlock_unlock(&limiter.limiter_lock);
+}
+
+#if 0
+static void limiter_accept(comm_limiter_t *limiter) {
+    int sock, result;
+    struct sockaddr_in fromaddr;
+    socklen_t fromlen = sizeof(fromaddr);
+    remote_node_t sender;
+    remote_limiter_t *remote;
+    hello_t hello;
+    comm_ident_t *ident;
+    ident_handle *handle = NULL;
+
+    sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
+
+    assert(sock > 0);
+
+    memset(&hello, 0, sizeof(hello_t));
+    result = recv(sock, &hello, sizeof(hello_t), 0);
+
+    if (result < 0) {
+        close(sock);
+        return; /* Failure - ignore it. */
+    }
+
+    assert(result == sizeof(hello_t));
+
+    hello_to_hbo(&hello);
+
+    assert(hello.magic == MAGIC_HELLO);
+
+    memset(&sender, 0, sizeof(remote_node_t));
+    sender.addr = fromaddr.sin_addr.s_addr;
+    sender.port = ntohs(hello.port);
+
+    pthread_testcancel();
+
+    pthread_rwlock_rdlock(&limiter->rwlock);
+
+    handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
+
+    if (handle == NULL) {
+        printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
+        pthread_rwlock_unlock(&limiter->rwlock);
+        return;
+    }
+
+    ident = limiter->identities[*handle];
+    assert(ident != NULL);
+
+    pthread_mutex_lock(&ident->lock);
+
+    remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
+
+    if (remote == NULL) {
+        printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(&limiter->rwlock);
+        close(sock);
+        return;
+    }
+
+    if (is_connected(remote)) {
+        /* We are still connected, don't need the new socket. */
+        close(sock);
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(&limiter->rwlock);
+        return;
+    }
+
+    /* We weren't connected, but we are now... */
+    remote->socket = sock;
+    printf("Got connection on: %d\n", sock);
+    FD_SET(sock, &ident->fds);
+
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(&limiter->rwlock);
+}
+
+static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
+    int result;
+    message_t msg;
+
+    memset(&msg, 0, sizeof(message_t));
+
+    result = recv(sock, &msg, sizeof(message_t), 0);
+
+    if (result < 0) {
+        pthread_rwlock_rdlock(limiter_rwlock);
+        pthread_mutex_lock(&ident->lock);
+        FD_CLR(sock, &ident->fds);
+        close(sock);
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(limiter_rwlock);
+        return;
+    }
+
+    assert(result == sizeof(message_t));
+
+    message_to_hbo(&msg);
+    assert(msg.magic == MAGIC_MSG);
+
+    pthread_rwlock_rdlock(limiter_rwlock);
+    pthread_mutex_lock(&ident->lock);
+
+    switch (ident->comm_fabric) {
+        case COMM_GOSSIP: {
+            ident->gossip.value += msg.value;
+            ident->gossip.weight += msg.weight;
+        }
+        break;
+
+        default: {
+            assert(1 == 0); /* This case shouldn't happen. Punt for now... */
+        }
+    }
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(limiter_rwlock);
+}
+
+static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
+    int select_result, i;
+    fd_set fds_copy;
+    struct timeval timeout;
+
+    FD_ZERO(&fds_copy);
+    timeout.tv_sec = 15;
+    timeout.tv_usec = 0;
+
+    pthread_rwlock_rdlock(limiter_rwlock);
+    pthread_mutex_lock(&ident->lock);
+    memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(limiter_rwlock);
+    
+    /* mask interrupt signals for this thread? */
+
+    select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
+
+    assert(select_result >= 0);
+    
+    if (select_result == 0)
+        return; /* Timed out */
+
+    for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
+        if (FD_ISSET(i, &fds_copy)) {
+            read_tcp_message(ident, limiter_rwlock, i);
+            select_result--;
+        }
+    }
+}
+#endif
+
+int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
+    int result = 0;
+    remote_limiter_t *remote;
+    message_t msg;
+    struct sockaddr_in toaddr;
+
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+
+    memset(&msg, 0, sizeof(message_t));
+    msg.magic = MAGIC_MSG;
+    msg.ident_id = id;
+    msg.value = comm->local_rate;
+    /* Do we want seqnos for mesh?  We can get by without them. */
+
+    message_to_nbo(&msg);
+
+    /* Iterate though and send update to all remote limiters in our identity. */
+    map_reset_iterate(comm->remote_node_map);
+    while ((remote = map_next(comm->remote_node_map))) {
+        toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
+        toaddr.sin_port = remote->port;
+        if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+            printlog(LOG_CRITICAL, "ERR: limiter_send_mesh: sento failed.\n");
+            result = errno;
+            printlog(LOG_CRITICAL, "  - The error was |%d|\n", strerror(result));
+            break;
+        }
+    }
+
+    return result;
+}
+
+int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
+    int i, j, targetid;
+    int result = 0;
+    remote_limiter_t *remote;
+    struct sockaddr_in toaddr;
+    double msg_value, msg_weight;
+
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+
+    msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
+    msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
+
+    for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+        message_t msg;
+
+        if (comm->retrys[i] >= 0) {
+            remote = &comm->remote_limiters[comm->retrys[i]];
+            targetid = comm->retrys[i];
+            //printf("%d:d:%d, ", i, comm->retrys[i]);
+        } else {
+            targetid = -2;
+
+            while (targetid == -2) {
+                targetid = myrand() % comm->remote_node_count;
+
+                for (j = 0; j < comm->gossip.gossip_branch; ++j) {
+                    if (targetid == comm->retrys[j]) {
+                        targetid = -2;
+                        break;
+                    }
+                }
+            }
+
+            remote = &comm->remote_limiters[targetid];
+            //printf("%d:r:%d, ", i, targetid);
+        }
+
+        toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
+        toaddr.sin_port = remote->port;
+
+        memset(&msg, 0, sizeof(message_t));
+        msg.magic = MAGIC_MSG;
+        msg.ident_id = id;
+        msg.value = msg_value + remote->outgoing.saved_value;
+        msg.weight = msg_weight + remote->outgoing.saved_weight;
+        msg.seqno = remote->outgoing.next_seqno;
+        msg.min_seqno = remote->outgoing.first_seqno;
+        msg.type = MSG;
+
+        remote->outgoing.next_seqno++;
+        remote->outgoing.saved_value += msg_value;
+        remote->outgoing.saved_weight += msg_weight;
+
+        message_to_nbo(&msg);
+
+        if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+            printlog(LOG_CRITICAL, "ERR: limiter_send_gossip: sento failed.\n");
+            result = errno;
+            break;
+        }
+
+        comm->retrys[i] = targetid;
+    }
+    //printf("\n");
+
+    comm->gossip.value = msg_value;
+    comm->gossip.weight = msg_weight;
+
+    return result;
+}
+
+#if 0
+int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
+    int i, targetid, sock;
+    int result = 0;
+    message_t msg;
+
+    memset(&msg, 0, sizeof(message_t));
+    msg.magic = MAGIC_MSG;
+    msg.ident_id = ident->ident_id;
+    msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1);
+    msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1);
+
+    message_to_nbo(&msg);
+
+    for (i = 0; i < ident->gossip.gossip_branch; ++i) {
+        targetid = myrand() % ident->remote_node_count;
+        sock = ident->remote_limiters[targetid].socket;
+
+        result = send(sock, &msg, sizeof(message_t), 0);
+        if (result < 0) {
+            result = errno;
+            FD_CLR(sock, &ident->fds);
+            close(sock);
+            break;
+        }
+
+        assert(result == sizeof(message_t));
+    }
+
+    ident->gossip.value /= (ident->gossip.gossip_branch + 1);
+    ident->gossip.weight /= (ident->gossip.gossip_branch + 1);
+
+    return result;
+}
+#endif
+
+#if 0
+void *limiter_accept_thread(void *limiter) {
+    sigset_t signal_mask;
+
+    sigemptyset(&signal_mask);
+    sigaddset(&signal_mask, SIGHUP);
+    pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
+
+    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+    while (1) {
+        limiter_accept((comm_limiter_t *) limiter);
+    }
+    pthread_exit(NULL);
+}
+void *ident_receive_thread(void *recv_args) {
+    int i, sock, result;
+    struct recv_thread_args *args = (struct recv_thread_args *) recv_args;
+    comm_ident_t *ident = args->ident;
+    pthread_rwlock_t *lock = args->lock;
+    uint16_t port = args->port;
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+    hello_t hello;
+
+    free(args);
+
+    while (1) {
+        memset(&hello, 0, sizeof(hello_t));
+
+        /*Try to connect to all remote nodes if they aren't already connected.*/
+        pthread_rwlock_rdlock(lock);
+        pthread_mutex_lock(&ident->lock);
+
+        hello.magic = MAGIC_HELLO;
+        hello.ident_id = ident->ident_id;
+        hello.port = ntohs(port);
+
+        hello_to_nbo(&hello);
+
+        for (i = 0; i < ident->remote_node_count; ++i) {
+            if (is_connected(&ident->remote_limiters[i]))
+                continue; /* Ignore if it's already connected */
+
+            sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+            if (sock < 0) {
+                perror("socket");
+                continue;
+            }
+
+            assert(sock >= 0);
+
+            memset(&addr, 0, sizeof(struct sockaddr_in));
+            addr.sin_family = AF_INET;
+            addr.sin_port = ident->remote_limiters[i].port;
+            addr.sin_addr.s_addr = ident->remote_limiters[i].addr;
+
+            result = connect(sock, (struct sockaddr *) &addr, addrlen);
+            if (result < 0) {
+                close(sock);
+                continue;
+            }
+
+            result = send(sock, &hello, sizeof(hello_t), 0);
+            if (result < 0) {
+                close(sock);
+                continue;
+            }
+
+            assert(result == sizeof(hello_t));
+
+            ident->remote_limiters[i].socket = sock;
+            printf("Connected on socket: %d\n", sock);
+            FD_SET(sock, &ident->fds);
+        }
+
+        pthread_rwlock_unlock(lock);
+        pthread_mutex_unlock(&ident->lock);
+
+        ident_receive(ident, lock);
+    }
+    pthread_exit(NULL);
+}
+#endif