/* See the DRL-LICENSE file for this file's software license. */ #ifdef BUILD_ZOOKEEPER #include #include #include #include #include #include #include #include "raterouter.h" #include "ratetypes.h" #include "drl_state.h" #include "peer_comm.h" #include "zk_drl.h" #include "logging.h" #define NULL_LEN (-1) #define PATH_LEN (64) #define PATH_BUFFER_LEN (64) static int32_t read_path_cversion(zhandle_t *zkhandle, const char *path) { struct Stat stat; int zoo_result; memset(&stat, 0, sizeof(struct Stat)); zoo_result = zoo_exists(zkhandle, path, 0, &stat); if (zoo_result != ZOK) { return -1; } return stat.cversion; } static int process_membership_change(zhandle_t *zkhandle, zkdrlcontext_t *context, const char *path) { struct String_vector children; int32_t view_before = 0; int32_t view_after = view_before + 1; //Needs to be != to view_before int zoo_result = 0; int i; while (view_before != view_after) { view_before = read_path_cversion(zkhandle, path); zoo_result = zoo_get_children(zkhandle, path, 1, &children); if (zoo_result != ZOK) { return zoo_result; } view_after = read_path_cversion(zkhandle, path); } if (view_after > context->comm->gossip.view) { printlog(LOG_DEBUG, "ZK:zookeeper watch says we need to restart with a new view.\n"); context->comm->restart_function(context->comm, view_after); } /* Clear the remote limiter list. This will be overwritten below for * limiters that are found to be in the new view. */ for (i = 0; i < context->comm->remote_node_count; ++i) { context->comm->remote_limiters[i].reachability = UNREACHABLE; context->comm->remote_limiters[i].view = view_after; context->comm->remote_limiters[i].view_confidence = NOTIN; } for (i = 0; i < children.count; ++i) { remote_limiter_t *remote_limiter = NULL; remote_node_t remote_node; memset(&remote_node, 0, sizeof(remote_node_t)); printlog(LOG_DEBUG, "ZK:children.data[%d] is %s\n", i, children.data[i]); sscanf(children.data[i], "%u", &remote_node.addr); remote_node.port = htons(LIMITER_LISTEN_PORT); if (remote_node.addr != context->local_addr) { printlog(LOG_DEBUG, "ZK:searching map for %u:%u\n", remote_node.addr, remote_node.port); remote_limiter = map_search(context->comm->remote_node_map, &remote_node, sizeof(remote_node_t)); assert(remote_limiter != NULL); remote_limiter->reachability = REACHABLE; remote_limiter->view_confidence = IN; } else { printlog(LOG_DEBUG, "ZK: %u is my own addr.\n", remote_node.addr); } } assert(view_after >= 0); context->comm->connected = 1; return ZOK; } static void zk_connected(zhandle_t *zkhandle, zkdrlcontext_t *context) { char path[PATH_LEN]; char path_buffer[PATH_BUFFER_LEN]; int zoo_result = 0; printlog(LOG_DEBUG, "ZK:(Re)Connected to zookeeper.\n"); sprintf(path, "/%u", context->id); zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, PATH_BUFFER_LEN); if (zoo_result == ZOK) { printlog(LOG_DEBUG, "ZK: created path %s\n", path); } else { //An error occurred. It was probably already there. printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result); } sprintf(path, "/%u/%u", context->id, context->local_addr); zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_READ_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, PATH_BUFFER_LEN); if (zoo_result == ZOK) { printlog(LOG_DEBUG, "ZK: created path %s\n", path); } else { printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result); } sprintf(path, "/%u", context->id); zoo_result = process_membership_change(zkhandle, context, path); if (zoo_result != ZOK) { printlog(LOG_WARN, "ZK: process_membership_change failed?\n"); } } static void zk_disconnected(zhandle_t *zkhandle, zkdrlcontext_t *context) { printlog(LOG_DEBUG, "ZK:Disconnected from zookeeper.\n"); context->comm->connected = 0; } static void zk_membership_change(zhandle_t *zkhandle, const char *path, zkdrlcontext_t *context) { int zoo_result = 0; printlog(LOG_DEBUG, "ZK:zookeeper child list changed.\n"); zoo_result = process_membership_change(zkhandle, context, path); } void zk_drl_restart(comm_t *comm, int32_t view_number) { int i; comm->gossip.value = comm->local_rate; comm->gossip.weight = 1.0; comm->gossip.view = view_number; for (i = 0; i < comm->remote_node_count; ++i) { if (comm->remote_limiters[i].view < view_number) { comm->remote_limiters[i].rate = 0; memset(&comm->remote_limiters[i].incoming, 0, sizeof(in_neighbor_t)); memset(&comm->remote_limiters[i].outgoing, 0, sizeof(out_neighbor_t)); comm->remote_limiters[i].view = view_number; comm->remote_limiters[i].view_confidence = UNSURE; } } printlog(LOG_DEBUG, "ZK: Changing view to %d\n", view_number); } static void zk_drl_watcher(zhandle_t *zkhandle, int type, int state, const char *path, void *context_ptr) { zkdrlcontext_t *context = (zkdrlcontext_t *) context_ptr; pthread_rwlock_rdlock(context->limiter_lock); pthread_mutex_lock(&context->comm->lock); if (type == ZOO_SESSION_EVENT) { if (state == ZOO_CONNECTED_STATE) { /* We're newly connected - set that watch! */ zk_connected(zkhandle, context); } else if (state == ZOO_CONNECTING_STATE) { /* We're no longer connected. Do something safe. */ zk_disconnected(zkhandle, context); } else if (state == ZOO_EXPIRED_SESSION_STATE) { printlog(LOG_DEBUG, "ZK:zookeeper session expired - reconnecting.\n"); context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0); } else { printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path); } } else if (type == ZOO_CHILD_EVENT) { /* The list of child nodes in the group has changed. Re-read the * group membership list and re-set the watch. */ zk_membership_change(zkhandle, path, context); } else { printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path); } pthread_mutex_unlock(&context->comm->lock); pthread_rwlock_unlock(context->limiter_lock); } int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) { if (msg->type == ACK) { /* If ACK was received then reset the awol count */ if (msg->view == comm->gossip.view && msg->seqno == remote->outgoing.next_seqno - 1) { /* Ack for most recent message. Clear saved state. */ remote->outgoing.first_seqno = remote->outgoing.next_seqno; remote->outgoing.saved_value = 0; remote->outgoing.saved_weight = 0; remote->awol = 0; } else if (msg->view > comm->gossip.view) { printlog(LOG_DEBUG, "ZK:Received ack for newer view, restarting.\n"); comm->restart_function(comm, msg->view); remote->view_confidence = IN; remote->awol = 0; } /* Ignore ack if it isn't for most recent message or its from an old view. */ } else if (msg->type == MSG) { if (msg->view == comm->gossip.view) { if (msg->min_seqno > remote->incoming.seen_seqno) { /* Entirely new information */ remote->incoming.seen_seqno = msg->seqno; remote->incoming.saved_value = msg->value; remote->incoming.saved_weight = msg->weight; comm->gossip.value += msg->value; comm->gossip.weight += msg->weight; send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); remote->awol = 0; } else if (msg->seqno > remote->incoming.seen_seqno) { /* Only some of the message is old news. */ double diff_value = msg->value - remote->incoming.saved_value; double diff_weight = msg->weight - remote->incoming.saved_weight; remote->incoming.seen_seqno = msg->seqno; remote->incoming.saved_value = msg->value; remote->incoming.saved_weight = msg->weight; comm->gossip.value += diff_value; comm->gossip.weight += diff_weight; send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); remote->awol = 0; } else { /* The entire message is old news. (Duplicate). */ /* Do nothing. */ } } else if (msg->view > comm->gossip.view) { printlog(LOG_DEBUG, "ZK:received message with a newer viewstamp, restarting.\n"); comm->restart_function(comm, msg->view); remote->view_confidence = IN; remote->incoming.seen_seqno = msg->seqno; remote->incoming.saved_value = msg->value; remote->incoming.saved_weight = msg->weight; comm->gossip.value += msg->value; comm->gossip.weight += msg->weight; send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); remote->awol = 0; } else if (msg->view < comm->gossip.view) { printlog(LOG_DEBUG, "ZK:received a message with an older viewstamp.\n"); if (remote->view_confidence == IN) { /* The sender is in the new view and doesn't know it yet. */ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); } else if (remote->view_confidence == UNSURE) { /* We don't know if he's in or not. */ send_ack(id, remote, msg->seqno, UNSUREACK, comm->gossip.view); } else if (remote->view_confidence == NOTIN) { /* He's out of luck... */ send_ack(id, remote, msg->seqno, NACK, comm->gossip.view); } remote->awol = 0; } } else if (msg->type == UNSUREACK) { /* We received an ack, but the ack sender was unsure whether or not * we'll be a part of its new view. Can't do much here... */ if (msg->view > comm->gossip.view) { remote->view = msg->view; remote->view_confidence = IN; remote->awol = 0; printlog(LOG_DEBUG, "ZK:received an UNSUREACK for view %d\n", msg->view); } } else if (msg->type == NACK) { if (msg->view > comm->gossip.view) { remote->view = msg->view; remote->view_confidence = IN; remote->awol = 0; comm->connected = 0; printlog(LOG_DEBUG, "ZK:received a NACK for view %d\n", msg->view); } } return 0; } int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config) { zkdrlcontext_t *context = NULL; comm->connected = 0; if ((context = malloc(sizeof(zkdrlcontext_t))) == NULL) { return ENOMEM; } context->zk_host = config->zk_host; context->limiter_lock = &limiter->limiter_lock; context->comm = comm; context->id = id; context->local_addr = limiter->localaddr; comm->membership_state = context; printlog(LOG_DEBUG, "ZK: Calling zk init\n"); context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0); if (context->zkhandle == NULL) { printlog(LOG_CRITICAL, "ZK: docs say that this can fail, but they don't say why. :( Errno is %d\n", errno); return EINVAL; } comm->recv_function = zk_drl_recv; comm->send_function = send_udp_gossip; comm->restart_function = zk_drl_restart; return 0; } int zk_drl_close(comm_t *comm) { zkdrlcontext_t *context = (zkdrlcontext_t *) comm->membership_state; zookeeper_close(context->zkhandle); if (context && context->zk_host) { free(context->zk_host); context->zk_host = NULL; } if (context) { free(context); comm->membership_state = NULL; } return 0; } #endif /* BUILD_ZOOKEEPER */