1 /* See the DRL-LICENSE file for this file's software license. */
13 #include "raterouter.h"
14 #include "ratetypes.h"
15 #include "drl_state.h"
16 #include "peer_comm.h"
22 #define PATH_BUFFER_LEN (64)
24 static int32_t read_path_cversion(zhandle_t *zkhandle, const char *path) {
28 memset(&stat, 0, sizeof(struct Stat));
30 zoo_result = zoo_exists(zkhandle, path, 0, &stat);
32 if (zoo_result != ZOK) {
39 static int process_membership_change(zhandle_t *zkhandle, zkdrlcontext_t *context, const char *path) {
40 struct String_vector children;
41 int32_t view_before = 0;
42 int32_t view_after = view_before + 1; //Needs to be != to view_before
46 while (view_before != view_after) {
47 view_before = read_path_cversion(zkhandle, path);
49 zoo_result = zoo_get_children(zkhandle, path, 1, &children);
50 if (zoo_result != ZOK) {
54 view_after = read_path_cversion(zkhandle, path);
57 if (view_after > context->comm->gossip.view) {
58 printlog(LOG_DEBUG, "ZK:zookeeper watch says we need to restart with a new view.\n");
59 context->comm->restart_function(context->comm, view_after);
62 /* Clear the remote limiter list. This will be overwritten below for
63 * limiters that are found to be in the new view. */
64 for (i = 0; i < context->comm->remote_node_count; ++i) {
65 context->comm->remote_limiters[i].reachability = UNREACHABLE;
66 context->comm->remote_limiters[i].view = view_after;
67 context->comm->remote_limiters[i].view_confidence = NOTIN;
70 for (i = 0; i < children.count; ++i) {
71 remote_limiter_t *remote_limiter = NULL;
72 remote_node_t remote_node;
74 memset(&remote_node, 0, sizeof(remote_node_t));
76 printlog(LOG_DEBUG, "ZK:children.data[%d] is %s\n", i, children.data[i]);
78 sscanf(children.data[i], "%u", &remote_node.addr);
79 remote_node.port = htons(LIMITER_LISTEN_PORT);
81 if (remote_node.addr != context->local_addr) {
82 printlog(LOG_DEBUG, "ZK:searching map for %u:%u\n", remote_node.addr, remote_node.port);
83 remote_limiter = map_search(context->comm->remote_node_map, &remote_node, sizeof(remote_node_t));
84 assert(remote_limiter != NULL);
85 remote_limiter->reachability = REACHABLE;
86 remote_limiter->view_confidence = IN;
88 printlog(LOG_DEBUG, "ZK: %u is my own addr.\n", remote_node.addr);
92 assert(view_after >= 0);
94 context->comm->connected = 1;
99 static void zk_connected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
101 char path_buffer[PATH_BUFFER_LEN];
104 printlog(LOG_DEBUG, "ZK:(Re)Connected to zookeeper.\n");
106 sprintf(path, "/%u", context->id);
108 zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, PATH_BUFFER_LEN);
110 if (zoo_result == ZOK) {
111 printlog(LOG_DEBUG, "ZK: created path %s\n", path);
113 //An error occurred. It was probably already there.
114 printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
117 sprintf(path, "/%u/%u", context->id, context->local_addr);
119 zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_READ_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, PATH_BUFFER_LEN);
121 if (zoo_result == ZOK) {
122 printlog(LOG_DEBUG, "ZK: created path %s\n", path);
124 printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
127 sprintf(path, "/%u", context->id);
129 zoo_result = process_membership_change(zkhandle, context, path);
131 if (zoo_result != ZOK) {
132 printlog(LOG_WARN, "ZK: process_membership_change failed?\n");
136 static void zk_disconnected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
137 printlog(LOG_DEBUG, "ZK:Disconnected from zookeeper.\n");
139 context->comm->connected = 0;
142 static void zk_membership_change(zhandle_t *zkhandle, const char *path, zkdrlcontext_t *context) {
145 printlog(LOG_DEBUG, "ZK:zookeeper child list changed.\n");
147 zoo_result = process_membership_change(zkhandle, context, path);
150 void zk_drl_restart(comm_t *comm, int32_t view_number) {
153 comm->gossip.value = comm->local_rate;
154 comm->gossip.weight = 1.0;
155 comm->gossip.view = view_number;
157 for (i = 0; i < comm->remote_node_count; ++i) {
158 if (comm->remote_limiters[i].view < view_number) {
159 comm->remote_limiters[i].rate = 0;
160 memset(&comm->remote_limiters[i].incoming, 0, sizeof(in_neighbor_t));
161 memset(&comm->remote_limiters[i].outgoing, 0, sizeof(out_neighbor_t));
162 comm->remote_limiters[i].view = view_number;
163 comm->remote_limiters[i].view_confidence = UNSURE;
167 printlog(LOG_DEBUG, "ZK: Changing view to %d\n", view_number);
170 static void zk_drl_watcher(zhandle_t *zkhandle, int type, int state, const char *path, void *context_ptr) {
171 zkdrlcontext_t *context = (zkdrlcontext_t *) context_ptr;
173 pthread_rwlock_rdlock(context->limiter_lock);
174 pthread_mutex_lock(&context->comm->lock);
176 if (type == ZOO_SESSION_EVENT) {
177 if (state == ZOO_CONNECTED_STATE) {
178 /* We're newly connected - set that watch! */
179 zk_connected(zkhandle, context);
180 } else if (state == ZOO_CONNECTING_STATE) {
181 /* We're no longer connected. Do something safe. */
182 zk_disconnected(zkhandle, context);
183 } else if (state == ZOO_EXPIRED_SESSION_STATE) {
184 printlog(LOG_DEBUG, "ZK:zookeeper session expired - reconnecting.\n");
185 context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
187 printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
189 } else if (type == ZOO_CHILD_EVENT) {
190 /* The list of child nodes in the group has changed. Re-read the
191 * group membership list and re-set the watch. */
192 zk_membership_change(zkhandle, path, context);
194 printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
197 pthread_mutex_unlock(&context->comm->lock);
198 pthread_rwlock_unlock(context->limiter_lock);
201 int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
202 if (msg->type == ACK) {
203 /* If ACK was received then reset the awol count */
204 if (msg->view == comm->gossip.view && msg->seqno == remote->outgoing.next_seqno - 1) {
205 /* Ack for most recent message. Clear saved state. */
206 remote->outgoing.first_seqno = remote->outgoing.next_seqno;
207 remote->outgoing.saved_value = 0;
208 remote->outgoing.saved_weight = 0;
211 } else if (msg->view > comm->gossip.view) {
212 printlog(LOG_DEBUG, "ZK:Received ack for newer view, restarting.\n");
213 comm->restart_function(comm, msg->view);
214 remote->view_confidence = IN;
217 /* Ignore ack if it isn't for most recent message or its from an old view. */
218 } else if (msg->type == MSG) {
219 if (msg->view == comm->gossip.view) {
220 if (msg->min_seqno > remote->incoming.seen_seqno) {
221 /* Entirely new information */
222 remote->incoming.seen_seqno = msg->seqno;
223 remote->incoming.saved_value = msg->value;
224 remote->incoming.saved_weight = msg->weight;
225 comm->gossip.value += msg->value;
226 comm->gossip.weight += msg->weight;
227 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
230 else if (msg->seqno > remote->incoming.seen_seqno) {
231 /* Only some of the message is old news. */
232 double diff_value = msg->value - remote->incoming.saved_value;
233 double diff_weight = msg->weight - remote->incoming.saved_weight;
235 remote->incoming.seen_seqno = msg->seqno;
236 remote->incoming.saved_value = msg->value;
237 remote->incoming.saved_weight = msg->weight;
239 comm->gossip.value += diff_value;
240 comm->gossip.weight += diff_weight;
241 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
245 /* The entire message is old news. (Duplicate). */
248 } else if (msg->view > comm->gossip.view) {
249 printlog(LOG_DEBUG, "ZK:received message with a newer viewstamp, restarting.\n");
250 comm->restart_function(comm, msg->view);
251 remote->view_confidence = IN;
253 remote->incoming.seen_seqno = msg->seqno;
254 remote->incoming.saved_value = msg->value;
255 remote->incoming.saved_weight = msg->weight;
256 comm->gossip.value += msg->value;
257 comm->gossip.weight += msg->weight;
258 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
260 } else if (msg->view < comm->gossip.view) {
261 printlog(LOG_DEBUG, "ZK:received a message with an older viewstamp.\n");
262 if (remote->view_confidence == IN) {
263 /* The sender is in the new view and doesn't know it yet. */
264 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
265 } else if (remote->view_confidence == UNSURE) {
266 /* We don't know if he's in or not. */
267 send_ack(id, remote, msg->seqno, UNSUREACK, comm->gossip.view);
268 } else if (remote->view_confidence == NOTIN) {
269 /* He's out of luck... */
270 send_ack(id, remote, msg->seqno, NACK, comm->gossip.view);
274 } else if (msg->type == UNSUREACK) {
275 /* We received an ack, but the ack sender was unsure whether or not
276 * we'll be a part of its new view. Can't do much here... */
277 if (msg->view > comm->gossip.view) {
278 remote->view = msg->view;
279 remote->view_confidence = IN;
281 printlog(LOG_DEBUG, "ZK:received an UNSUREACK for view %d\n", msg->view);
283 } else if (msg->type == NACK) {
284 if (msg->view > comm->gossip.view) {
285 remote->view = msg->view;
286 remote->view_confidence = IN;
290 printlog(LOG_DEBUG, "ZK:received a NACK for view %d\n", msg->view);
297 int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config) {
298 zkdrlcontext_t *context = NULL;
301 if ((context = malloc(sizeof(zkdrlcontext_t))) == NULL) {
305 context->zk_host = config->zk_host;
306 context->limiter_lock = &limiter->limiter_lock;
307 context->comm = comm;
309 context->local_addr = limiter->localaddr;
310 comm->membership_state = context;
312 printlog(LOG_DEBUG, "ZK: Calling zk init\n");
314 context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
316 if (context->zkhandle == NULL) {
317 printlog(LOG_CRITICAL, "ZK: docs say that this can fail, but they don't say why. :( Errno is %d\n", errno);
321 comm->recv_function = zk_drl_recv;
322 comm->send_function = send_udp_gossip;
323 comm->restart_function = zk_drl_restart;
328 int zk_drl_close(comm_t *comm) {
329 zkdrlcontext_t *context = (zkdrlcontext_t *) comm->membership_state;
331 zookeeper_close(context->zkhandle);
333 if (context && context->zk_host) {
334 free(context->zk_host);
335 context->zk_host = NULL;
340 comm->membership_state = NULL;
346 #endif /* BUILD_ZOOKEEPER */