--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+#ifdef BUILD_ZOOKEEPER
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "zk_drl.h"
+#include "logging.h"
+
+#define NULL_LEN (-1)
+#define PATH_LEN (64)
+#define PATH_BUFFER_LEN (64)
+
+static int32_t read_path_cversion(zhandle_t *zkhandle, const char *path) {
+ struct Stat stat;
+ int zoo_result;
+
+ memset(&stat, 0, sizeof(struct Stat));
+
+ zoo_result = zoo_exists(zkhandle, path, 0, &stat);
+
+ if (zoo_result != ZOK) {
+ return -1;
+ }
+
+ return stat.cversion;
+}
+
+static int process_membership_change(zhandle_t *zkhandle, zkdrlcontext_t *context, const char *path) {
+ struct String_vector children;
+ int32_t view_before = 0;
+ int32_t view_after = view_before + 1; //Needs to be != to view_before
+ int zoo_result = 0;
+ int i;
+
+ while (view_before != view_after) {
+ view_before = read_path_cversion(zkhandle, path);
+
+ zoo_result = zoo_get_children(zkhandle, path, 1, &children);
+ if (zoo_result != ZOK) {
+ return zoo_result;
+ }
+
+ view_after = read_path_cversion(zkhandle, path);
+ }
+
+ if (view_after > context->comm->gossip.view) {
+ printlog(LOG_DEBUG, "ZK:zookeeper watch says we need to restart with a new view.\n");
+ context->comm->restart_function(context->comm, view_after);
+ }
+
+ /* Clear the remote limiter list. This will be overwritten below for
+ * limiters that are found to be in the new view. */
+ for (i = 0; i < context->comm->remote_node_count; ++i) {
+ context->comm->remote_limiters[i].reachability = UNREACHABLE;
+ context->comm->remote_limiters[i].view = view_after;
+ context->comm->remote_limiters[i].view_confidence = NOTIN;
+ }
+
+ for (i = 0; i < children.count; ++i) {
+ remote_limiter_t *remote_limiter = NULL;
+ remote_node_t remote_node;
+
+ memset(&remote_node, 0, sizeof(remote_node_t));
+
+ printlog(LOG_DEBUG, "ZK:children.data[%d] is %s\n", i, children.data[i]);
+
+ sscanf(children.data[i], "%u", &remote_node.addr);
+ remote_node.port = htons(LIMITER_LISTEN_PORT);
+
+ if (remote_node.addr != context->local_addr) {
+ printlog(LOG_DEBUG, "ZK:searching map for %u:%u\n", remote_node.addr, remote_node.port);
+ remote_limiter = map_search(context->comm->remote_node_map, &remote_node, sizeof(remote_node_t));
+ assert(remote_limiter != NULL);
+ remote_limiter->reachability = REACHABLE;
+ remote_limiter->view_confidence = IN;
+ } else {
+ printlog(LOG_DEBUG, "ZK: %u is my own addr.\n", remote_node.addr);
+ }
+ }
+
+ assert(view_after >= 0);
+
+ context->comm->connected = 1;
+
+ return ZOK;
+}
+
+static void zk_connected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
+ char path[PATH_LEN];
+ char path_buffer[PATH_BUFFER_LEN];
+ int zoo_result = 0;
+
+ printlog(LOG_DEBUG, "ZK:(Re)Connected to zookeeper.\n");
+
+ sprintf(path, "/%u", context->id);
+
+ zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, PATH_BUFFER_LEN);
+
+ if (zoo_result == ZOK) {
+ printlog(LOG_DEBUG, "ZK: created path %s\n", path);
+ } else {
+ //An error occurred. It was probably already there.
+ printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
+ }
+
+ sprintf(path, "/%u/%u", context->id, context->local_addr);
+
+ zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_READ_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, PATH_BUFFER_LEN);
+
+ if (zoo_result == ZOK) {
+ printlog(LOG_DEBUG, "ZK: created path %s\n", path);
+ } else {
+ printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
+ }
+
+ sprintf(path, "/%u", context->id);
+
+ zoo_result = process_membership_change(zkhandle, context, path);
+
+ if (zoo_result != ZOK) {
+ printlog(LOG_WARN, "ZK: process_membership_change failed?\n");
+ }
+}
+
+static void zk_disconnected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
+ printlog(LOG_DEBUG, "ZK:Disconnected from zookeeper.\n");
+
+ context->comm->connected = 0;
+}
+
+static void zk_membership_change(zhandle_t *zkhandle, const char *path, zkdrlcontext_t *context) {
+ int zoo_result = 0;
+
+ printlog(LOG_DEBUG, "ZK:zookeeper child list changed.\n");
+
+ zoo_result = process_membership_change(zkhandle, context, path);
+}
+
+void zk_drl_restart(comm_t *comm, int32_t view_number) {
+ int i;
+
+ comm->gossip.value = comm->local_rate;
+ comm->gossip.weight = 1.0;
+ comm->gossip.view = view_number;
+
+ for (i = 0; i < comm->remote_node_count; ++i) {
+ if (comm->remote_limiters[i].view < view_number) {
+ comm->remote_limiters[i].rate = 0;
+ memset(&comm->remote_limiters[i].incoming, 0, sizeof(in_neighbor_t));
+ memset(&comm->remote_limiters[i].outgoing, 0, sizeof(out_neighbor_t));
+ comm->remote_limiters[i].view = view_number;
+ comm->remote_limiters[i].view_confidence = UNSURE;
+ }
+ }
+
+ printlog(LOG_DEBUG, "ZK: Changing view to %d\n", view_number);
+}
+
+static void zk_drl_watcher(zhandle_t *zkhandle, int type, int state, const char *path, void *context_ptr) {
+ zkdrlcontext_t *context = (zkdrlcontext_t *) context_ptr;
+
+ pthread_rwlock_rdlock(context->limiter_lock);
+ pthread_mutex_lock(&context->comm->lock);
+
+ if (type == ZOO_SESSION_EVENT) {
+ if (state == ZOO_CONNECTED_STATE) {
+ /* We're newly connected - set that watch! */
+ zk_connected(zkhandle, context);
+ } else if (state == ZOO_CONNECTING_STATE) {
+ /* We're no longer connected. Do something safe. */
+ zk_disconnected(zkhandle, context);
+ } else if (state == ZOO_EXPIRED_SESSION_STATE) {
+ printlog(LOG_DEBUG, "ZK:zookeeper session expired - reconnecting.\n");
+ context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
+ } else {
+ printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
+ }
+ } else if (type == ZOO_CHILD_EVENT) {
+ /* The list of child nodes in the group has changed. Re-read the
+ * group membership list and re-set the watch. */
+ zk_membership_change(zkhandle, path, context);
+ } else {
+ printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
+ }
+
+ pthread_mutex_unlock(&context->comm->lock);
+ pthread_rwlock_unlock(context->limiter_lock);
+}
+
+int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+ if (msg->type == ACK) {
+ /* If ACK was received then reset the awol count */
+ if (msg->view == comm->gossip.view && msg->seqno == remote->outgoing.next_seqno - 1) {
+ /* Ack for most recent message. Clear saved state. */
+ remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+ remote->outgoing.saved_value = 0;
+ remote->outgoing.saved_weight = 0;
+ remote->awol = 0;
+
+ } else if (msg->view > comm->gossip.view) {
+ printlog(LOG_DEBUG, "ZK:Received ack for newer view, restarting.\n");
+ comm->restart_function(comm, msg->view);
+ remote->view_confidence = IN;
+ remote->awol = 0;
+ }
+ /* Ignore ack if it isn't for most recent message or its from an old view. */
+ } else if (msg->type == MSG) {
+ if (msg->view == comm->gossip.view) {
+ if (msg->min_seqno > remote->incoming.seen_seqno) {
+ /* Entirely new information */
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+ comm->gossip.value += msg->value;
+ comm->gossip.weight += msg->weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ }
+ else if (msg->seqno > remote->incoming.seen_seqno) {
+ /* Only some of the message is old news. */
+ double diff_value = msg->value - remote->incoming.saved_value;
+ double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+
+ comm->gossip.value += diff_value;
+ comm->gossip.weight += diff_weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ }
+ else {
+ /* The entire message is old news. (Duplicate). */
+ /* Do nothing. */
+ }
+ } else if (msg->view > comm->gossip.view) {
+ printlog(LOG_DEBUG, "ZK:received message with a newer viewstamp, restarting.\n");
+ comm->restart_function(comm, msg->view);
+ remote->view_confidence = IN;
+
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+ comm->gossip.value += msg->value;
+ comm->gossip.weight += msg->weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ } else if (msg->view < comm->gossip.view) {
+ printlog(LOG_DEBUG, "ZK:received a message with an older viewstamp.\n");
+ if (remote->view_confidence == IN) {
+ /* The sender is in the new view and doesn't know it yet. */
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ } else if (remote->view_confidence == UNSURE) {
+ /* We don't know if he's in or not. */
+ send_ack(id, remote, msg->seqno, UNSUREACK, comm->gossip.view);
+ } else if (remote->view_confidence == NOTIN) {
+ /* He's out of luck... */
+ send_ack(id, remote, msg->seqno, NACK, comm->gossip.view);
+ }
+ remote->awol = 0;
+ }
+ } else if (msg->type == UNSUREACK) {
+ /* We received an ack, but the ack sender was unsure whether or not
+ * we'll be a part of its new view. Can't do much here... */
+ if (msg->view > comm->gossip.view) {
+ remote->view = msg->view;
+ remote->view_confidence = IN;
+ remote->awol = 0;
+ printlog(LOG_DEBUG, "ZK:received an UNSUREACK for view %d\n", msg->view);
+ }
+ } else if (msg->type == NACK) {
+ if (msg->view > comm->gossip.view) {
+ remote->view = msg->view;
+ remote->view_confidence = IN;
+ remote->awol = 0;
+
+ comm->connected = 0;
+ printlog(LOG_DEBUG, "ZK:received a NACK for view %d\n", msg->view);
+ }
+ }
+
+ return 0;
+}
+
+int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config) {
+ zkdrlcontext_t *context = NULL;
+ comm->connected = 0;
+
+ if ((context = malloc(sizeof(zkdrlcontext_t))) == NULL) {
+ return ENOMEM;
+ }
+
+ context->zk_host = config->zk_host;
+ context->limiter_lock = &limiter->limiter_lock;
+ context->comm = comm;
+ context->id = id;
+ context->local_addr = limiter->localaddr;
+ comm->membership_state = context;
+
+ printlog(LOG_DEBUG, "ZK: Calling zk init\n");
+
+ context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
+
+ if (context->zkhandle == NULL) {
+ printlog(LOG_CRITICAL, "ZK: docs say that this can fail, but they don't say why. :( Errno is %d\n", errno);
+ return EINVAL;
+ }
+
+ comm->recv_function = zk_drl_recv;
+ comm->send_function = send_udp_gossip;
+ comm->restart_function = zk_drl_restart;
+
+ return 0;
+}
+
+int zk_drl_close(comm_t *comm) {
+ zkdrlcontext_t *context = (zkdrlcontext_t *) comm->membership_state;
+
+ zookeeper_close(context->zkhandle);
+
+ if (context && context->zk_host) {
+ free(context->zk_host);
+ context->zk_host = NULL;
+ }
+
+ if (context) {
+ free(context);
+ comm->membership_state = NULL;
+ }
+
+ return 0;
+}
+
+#endif /* BUILD_ZOOKEEPER */