Updates to autotools for library detection
[distributedratelimiting.git] / drl / zk_drl.c
diff --git a/drl/zk_drl.c b/drl/zk_drl.c
new file mode 100644 (file)
index 0000000..a9fd92d
--- /dev/null
@@ -0,0 +1,346 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#ifdef BUILD_ZOOKEEPER
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "zk_drl.h"
+#include "logging.h"
+
+#define NULL_LEN (-1)
+#define PATH_LEN (64)
+#define PATH_BUFFER_LEN (64)
+
+static int32_t read_path_cversion(zhandle_t *zkhandle, const char *path) {
+    struct Stat stat;
+    int zoo_result;
+
+    memset(&stat, 0, sizeof(struct Stat));
+
+    zoo_result = zoo_exists(zkhandle, path, 0, &stat);
+
+    if (zoo_result != ZOK) {
+        return -1;
+    }
+
+    return stat.cversion;
+}
+
+static int process_membership_change(zhandle_t *zkhandle, zkdrlcontext_t *context, const char *path) {
+    struct String_vector children;
+    int32_t view_before = 0;
+    int32_t view_after = view_before + 1; //Needs to be != to view_before
+    int zoo_result = 0;
+    int i;
+
+    while (view_before != view_after) {
+        view_before = read_path_cversion(zkhandle, path);
+
+        zoo_result = zoo_get_children(zkhandle, path, 1, &children);
+        if (zoo_result != ZOK) {
+            return zoo_result;
+        }
+
+        view_after = read_path_cversion(zkhandle, path);
+    }
+
+    if (view_after > context->comm->gossip.view) {
+        printlog(LOG_DEBUG, "ZK:zookeeper watch says we need to restart with a new view.\n");
+        context->comm->restart_function(context->comm, view_after);
+    }
+
+    /* Clear the remote limiter list.  This will be overwritten below for
+     * limiters that are found to be in the new view. */
+    for (i = 0; i < context->comm->remote_node_count; ++i) {
+        context->comm->remote_limiters[i].reachability = UNREACHABLE;
+        context->comm->remote_limiters[i].view = view_after;
+        context->comm->remote_limiters[i].view_confidence = NOTIN;
+    }
+
+    for (i = 0; i < children.count; ++i) {
+        remote_limiter_t *remote_limiter = NULL;
+        remote_node_t remote_node;
+
+        memset(&remote_node, 0, sizeof(remote_node_t));
+
+        printlog(LOG_DEBUG, "ZK:children.data[%d] is %s\n", i, children.data[i]);
+
+        sscanf(children.data[i], "%u", &remote_node.addr);
+        remote_node.port = htons(LIMITER_LISTEN_PORT);
+
+        if (remote_node.addr != context->local_addr) {
+            printlog(LOG_DEBUG, "ZK:searching map for %u:%u\n", remote_node.addr, remote_node.port);
+            remote_limiter = map_search(context->comm->remote_node_map, &remote_node, sizeof(remote_node_t));
+            assert(remote_limiter != NULL);
+            remote_limiter->reachability = REACHABLE;
+            remote_limiter->view_confidence = IN;
+        } else {
+            printlog(LOG_DEBUG, "ZK: %u is my own addr.\n", remote_node.addr);
+        }
+    }
+    
+    assert(view_after >= 0);
+
+    context->comm->connected = 1;
+
+    return ZOK;
+}
+
+static void zk_connected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
+    char path[PATH_LEN];
+    char path_buffer[PATH_BUFFER_LEN];
+    int zoo_result = 0;
+
+    printlog(LOG_DEBUG, "ZK:(Re)Connected to zookeeper.\n");
+
+    sprintf(path, "/%u", context->id);
+
+    zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, PATH_BUFFER_LEN);
+
+    if (zoo_result == ZOK) {
+        printlog(LOG_DEBUG, "ZK: created path %s\n", path);
+    } else {
+        //An error occurred.  It was probably already there.
+        printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
+    }
+
+    sprintf(path, "/%u/%u", context->id, context->local_addr);
+    
+    zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_READ_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, PATH_BUFFER_LEN);
+
+    if (zoo_result == ZOK) {
+        printlog(LOG_DEBUG, "ZK: created path %s\n", path);
+    } else {
+        printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
+    }
+
+    sprintf(path, "/%u", context->id);
+
+    zoo_result = process_membership_change(zkhandle, context, path);
+
+    if (zoo_result != ZOK) {
+        printlog(LOG_WARN, "ZK: process_membership_change failed?\n");
+    }
+}
+
+static void zk_disconnected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
+    printlog(LOG_DEBUG, "ZK:Disconnected from zookeeper.\n");
+
+    context->comm->connected = 0;
+}
+
+static void zk_membership_change(zhandle_t *zkhandle, const char *path, zkdrlcontext_t *context) {
+    int zoo_result = 0;
+
+    printlog(LOG_DEBUG, "ZK:zookeeper child list changed.\n");
+
+    zoo_result = process_membership_change(zkhandle, context, path);
+}
+
+void zk_drl_restart(comm_t *comm, int32_t view_number) {
+    int i;
+
+    comm->gossip.value = comm->local_rate;
+    comm->gossip.weight = 1.0;
+    comm->gossip.view = view_number;
+
+    for (i = 0; i < comm->remote_node_count; ++i) {
+        if (comm->remote_limiters[i].view < view_number) {
+            comm->remote_limiters[i].rate = 0;
+            memset(&comm->remote_limiters[i].incoming, 0, sizeof(in_neighbor_t));
+            memset(&comm->remote_limiters[i].outgoing, 0, sizeof(out_neighbor_t));
+            comm->remote_limiters[i].view = view_number;
+            comm->remote_limiters[i].view_confidence = UNSURE;
+        }
+    }
+
+    printlog(LOG_DEBUG, "ZK: Changing view to %d\n", view_number);
+}
+
+static void zk_drl_watcher(zhandle_t *zkhandle, int type, int state, const char *path, void *context_ptr) {
+    zkdrlcontext_t *context = (zkdrlcontext_t *) context_ptr;
+
+    pthread_rwlock_rdlock(context->limiter_lock);
+    pthread_mutex_lock(&context->comm->lock);
+
+    if (type == ZOO_SESSION_EVENT) {
+        if (state == ZOO_CONNECTED_STATE) {
+            /* We're newly connected - set that watch! */
+            zk_connected(zkhandle, context);
+        } else if (state == ZOO_CONNECTING_STATE) {
+            /* We're no longer connected.  Do something safe. */
+            zk_disconnected(zkhandle, context);
+        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
+            printlog(LOG_DEBUG, "ZK:zookeeper session expired - reconnecting.\n");
+            context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
+        } else {
+            printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
+        }
+    } else if (type == ZOO_CHILD_EVENT) {
+        /* The list of child nodes in the group has changed.  Re-read the
+         * group membership list and re-set the watch. */
+        zk_membership_change(zkhandle, path, context);
+    } else {
+        printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
+    }
+
+    pthread_mutex_unlock(&context->comm->lock);
+    pthread_rwlock_unlock(context->limiter_lock);
+}
+
+int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+    if (msg->type == ACK) {
+        /* If ACK was received then reset the awol count */
+        if (msg->view == comm->gossip.view && msg->seqno == remote->outgoing.next_seqno - 1) {
+            /* Ack for most recent message.  Clear saved state. */
+            remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+            remote->outgoing.saved_value = 0;
+            remote->outgoing.saved_weight = 0;
+            remote->awol = 0;
+
+        } else if (msg->view > comm->gossip.view) {
+            printlog(LOG_DEBUG, "ZK:Received ack for newer view, restarting.\n");
+            comm->restart_function(comm, msg->view);
+            remote->view_confidence = IN;
+            remote->awol = 0;
+        }
+        /* Ignore ack if it isn't for most recent message or its from an old view. */
+    } else if (msg->type == MSG) {
+        if (msg->view == comm->gossip.view) {
+            if (msg->min_seqno > remote->incoming.seen_seqno) {
+                /* Entirely new information */
+                remote->incoming.seen_seqno = msg->seqno;
+                remote->incoming.saved_value = msg->value;
+                remote->incoming.saved_weight = msg->weight;
+                comm->gossip.value += msg->value;
+                comm->gossip.weight += msg->weight;
+                send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+                remote->awol = 0;
+            } 
+            else if (msg->seqno > remote->incoming.seen_seqno) {
+                /* Only some of the message is old news. */
+                double diff_value = msg->value - remote->incoming.saved_value;
+                double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+                remote->incoming.seen_seqno = msg->seqno;
+                remote->incoming.saved_value = msg->value;
+                remote->incoming.saved_weight = msg->weight;
+
+                comm->gossip.value += diff_value;
+                comm->gossip.weight += diff_weight;
+                send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+                remote->awol = 0;
+            } 
+            else {
+                /* The entire message is old news. (Duplicate). */
+                /* Do nothing. */
+            }
+        } else if (msg->view > comm->gossip.view) {
+            printlog(LOG_DEBUG, "ZK:received message with a newer viewstamp, restarting.\n");
+            comm->restart_function(comm, msg->view);
+            remote->view_confidence = IN;
+
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+            comm->gossip.value += msg->value;
+            comm->gossip.weight += msg->weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+        } else if (msg->view < comm->gossip.view) {
+            printlog(LOG_DEBUG, "ZK:received a message with an older viewstamp.\n");
+            if (remote->view_confidence == IN) {
+                /* The sender is in the new view and doesn't know it yet. */
+                send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            } else if (remote->view_confidence == UNSURE) {
+                /* We don't know if he's in or not. */
+                send_ack(id, remote, msg->seqno, UNSUREACK, comm->gossip.view);
+            } else if (remote->view_confidence == NOTIN) {
+                /* He's out of luck... */
+                send_ack(id, remote, msg->seqno, NACK, comm->gossip.view);
+            }
+            remote->awol = 0;
+        }
+    } else if (msg->type == UNSUREACK) {
+        /* We received an ack, but the ack sender was unsure whether or not
+         * we'll be a part of its new view.  Can't do much here... */
+        if (msg->view > comm->gossip.view) {
+            remote->view = msg->view;
+            remote->view_confidence = IN;
+            remote->awol = 0;
+            printlog(LOG_DEBUG, "ZK:received an UNSUREACK for view %d\n", msg->view);
+        }
+    } else if (msg->type == NACK) {
+        if (msg->view > comm->gossip.view) {
+            remote->view = msg->view;
+            remote->view_confidence = IN;
+            remote->awol = 0;
+
+            comm->connected = 0;
+            printlog(LOG_DEBUG, "ZK:received a NACK for view %d\n", msg->view);
+        }
+    }
+
+    return 0;
+}
+
+int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config) {
+    zkdrlcontext_t *context = NULL;
+    comm->connected = 0;
+
+    if ((context = malloc(sizeof(zkdrlcontext_t))) == NULL) {
+        return ENOMEM;
+    }
+
+    context->zk_host = config->zk_host;
+    context->limiter_lock = &limiter->limiter_lock;
+    context->comm = comm;
+    context->id = id;
+    context->local_addr = limiter->localaddr;
+    comm->membership_state = context;
+
+    printlog(LOG_DEBUG, "ZK: Calling zk init\n");
+
+    context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
+
+    if (context->zkhandle == NULL) {
+        printlog(LOG_CRITICAL, "ZK: docs say that this can fail, but they don't say why. :(  Errno is %d\n", errno);
+        return EINVAL;
+    }
+
+    comm->recv_function = zk_drl_recv;
+    comm->send_function = send_udp_gossip;
+    comm->restart_function = zk_drl_restart;
+
+    return 0;
+}
+
+int zk_drl_close(comm_t *comm) {
+    zkdrlcontext_t *context = (zkdrlcontext_t *) comm->membership_state;
+
+    zookeeper_close(context->zkhandle);
+    
+    if (context && context->zk_host) {
+        free(context->zk_host);
+        context->zk_host = NULL;
+    }
+
+    if (context) {
+        free(context);
+        comm->membership_state = NULL;
+    }
+
+    return 0;
+}
+
+#endif  /* BUILD_ZOOKEEPER */