Updates to autotools for library detection
[distributedratelimiting.git] / drl / zk_drl.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #ifdef BUILD_ZOOKEEPER
4
5 #include <arpa/inet.h>
6 #include <assert.h>
7 #include <inttypes.h>
8 #include <errno.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <unistd.h>
12
13 #include "raterouter.h"
14 #include "ratetypes.h"
15 #include "drl_state.h"
16 #include "peer_comm.h"
17 #include "zk_drl.h"
18 #include "logging.h"
19
20 #define NULL_LEN (-1)
21 #define PATH_LEN (64)
22 #define PATH_BUFFER_LEN (64)
23
24 static int32_t read_path_cversion(zhandle_t *zkhandle, const char *path) {
25     struct Stat stat;
26     int zoo_result;
27
28     memset(&stat, 0, sizeof(struct Stat));
29
30     zoo_result = zoo_exists(zkhandle, path, 0, &stat);
31
32     if (zoo_result != ZOK) {
33         return -1;
34     }
35
36     return stat.cversion;
37 }
38
39 static int process_membership_change(zhandle_t *zkhandle, zkdrlcontext_t *context, const char *path) {
40     struct String_vector children;
41     int32_t view_before = 0;
42     int32_t view_after = view_before + 1; //Needs to be != to view_before
43     int zoo_result = 0;
44     int i;
45
46     while (view_before != view_after) {
47         view_before = read_path_cversion(zkhandle, path);
48
49         zoo_result = zoo_get_children(zkhandle, path, 1, &children);
50         if (zoo_result != ZOK) {
51             return zoo_result;
52         }
53
54         view_after = read_path_cversion(zkhandle, path);
55     }
56
57     if (view_after > context->comm->gossip.view) {
58         printlog(LOG_DEBUG, "ZK:zookeeper watch says we need to restart with a new view.\n");
59         context->comm->restart_function(context->comm, view_after);
60     }
61
62     /* Clear the remote limiter list.  This will be overwritten below for
63      * limiters that are found to be in the new view. */
64     for (i = 0; i < context->comm->remote_node_count; ++i) {
65         context->comm->remote_limiters[i].reachability = UNREACHABLE;
66         context->comm->remote_limiters[i].view = view_after;
67         context->comm->remote_limiters[i].view_confidence = NOTIN;
68     }
69
70     for (i = 0; i < children.count; ++i) {
71         remote_limiter_t *remote_limiter = NULL;
72         remote_node_t remote_node;
73
74         memset(&remote_node, 0, sizeof(remote_node_t));
75
76         printlog(LOG_DEBUG, "ZK:children.data[%d] is %s\n", i, children.data[i]);
77
78         sscanf(children.data[i], "%u", &remote_node.addr);
79         remote_node.port = htons(LIMITER_LISTEN_PORT);
80
81         if (remote_node.addr != context->local_addr) {
82             printlog(LOG_DEBUG, "ZK:searching map for %u:%u\n", remote_node.addr, remote_node.port);
83             remote_limiter = map_search(context->comm->remote_node_map, &remote_node, sizeof(remote_node_t));
84             assert(remote_limiter != NULL);
85             remote_limiter->reachability = REACHABLE;
86             remote_limiter->view_confidence = IN;
87         } else {
88             printlog(LOG_DEBUG, "ZK: %u is my own addr.\n", remote_node.addr);
89         }
90     }
91     
92     assert(view_after >= 0);
93
94     context->comm->connected = 1;
95
96     return ZOK;
97 }
98
99 static void zk_connected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
100     char path[PATH_LEN];
101     char path_buffer[PATH_BUFFER_LEN];
102     int zoo_result = 0;
103
104     printlog(LOG_DEBUG, "ZK:(Re)Connected to zookeeper.\n");
105
106     sprintf(path, "/%u", context->id);
107
108     zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, PATH_BUFFER_LEN);
109
110     if (zoo_result == ZOK) {
111         printlog(LOG_DEBUG, "ZK: created path %s\n", path);
112     } else {
113         //An error occurred.  It was probably already there.
114         printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
115     }
116
117     sprintf(path, "/%u/%u", context->id, context->local_addr);
118     
119     zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_READ_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, PATH_BUFFER_LEN);
120
121     if (zoo_result == ZOK) {
122         printlog(LOG_DEBUG, "ZK: created path %s\n", path);
123     } else {
124         printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
125     }
126
127     sprintf(path, "/%u", context->id);
128
129     zoo_result = process_membership_change(zkhandle, context, path);
130
131     if (zoo_result != ZOK) {
132         printlog(LOG_WARN, "ZK: process_membership_change failed?\n");
133     }
134 }
135
136 static void zk_disconnected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
137     printlog(LOG_DEBUG, "ZK:Disconnected from zookeeper.\n");
138
139     context->comm->connected = 0;
140 }
141
142 static void zk_membership_change(zhandle_t *zkhandle, const char *path, zkdrlcontext_t *context) {
143     int zoo_result = 0;
144
145     printlog(LOG_DEBUG, "ZK:zookeeper child list changed.\n");
146
147     zoo_result = process_membership_change(zkhandle, context, path);
148 }
149
150 void zk_drl_restart(comm_t *comm, int32_t view_number) {
151     int i;
152
153     comm->gossip.value = comm->local_rate;
154     comm->gossip.weight = 1.0;
155     comm->gossip.view = view_number;
156
157     for (i = 0; i < comm->remote_node_count; ++i) {
158         if (comm->remote_limiters[i].view < view_number) {
159             comm->remote_limiters[i].rate = 0;
160             memset(&comm->remote_limiters[i].incoming, 0, sizeof(in_neighbor_t));
161             memset(&comm->remote_limiters[i].outgoing, 0, sizeof(out_neighbor_t));
162             comm->remote_limiters[i].view = view_number;
163             comm->remote_limiters[i].view_confidence = UNSURE;
164         }
165     }
166
167     printlog(LOG_DEBUG, "ZK: Changing view to %d\n", view_number);
168 }
169
170 static void zk_drl_watcher(zhandle_t *zkhandle, int type, int state, const char *path, void *context_ptr) {
171     zkdrlcontext_t *context = (zkdrlcontext_t *) context_ptr;
172
173     pthread_rwlock_rdlock(context->limiter_lock);
174     pthread_mutex_lock(&context->comm->lock);
175
176     if (type == ZOO_SESSION_EVENT) {
177         if (state == ZOO_CONNECTED_STATE) {
178             /* We're newly connected - set that watch! */
179             zk_connected(zkhandle, context);
180         } else if (state == ZOO_CONNECTING_STATE) {
181             /* We're no longer connected.  Do something safe. */
182             zk_disconnected(zkhandle, context);
183         } else if (state == ZOO_EXPIRED_SESSION_STATE) {
184             printlog(LOG_DEBUG, "ZK:zookeeper session expired - reconnecting.\n");
185             context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
186         } else {
187             printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
188         }
189     } else if (type == ZOO_CHILD_EVENT) {
190         /* The list of child nodes in the group has changed.  Re-read the
191          * group membership list and re-set the watch. */
192         zk_membership_change(zkhandle, path, context);
193     } else {
194         printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
195     }
196
197     pthread_mutex_unlock(&context->comm->lock);
198     pthread_rwlock_unlock(context->limiter_lock);
199 }
200
201 int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
202     if (msg->type == ACK) {
203         /* If ACK was received then reset the awol count */
204         if (msg->view == comm->gossip.view && msg->seqno == remote->outgoing.next_seqno - 1) {
205             /* Ack for most recent message.  Clear saved state. */
206             remote->outgoing.first_seqno = remote->outgoing.next_seqno;
207             remote->outgoing.saved_value = 0;
208             remote->outgoing.saved_weight = 0;
209             remote->awol = 0;
210
211         } else if (msg->view > comm->gossip.view) {
212             printlog(LOG_DEBUG, "ZK:Received ack for newer view, restarting.\n");
213             comm->restart_function(comm, msg->view);
214             remote->view_confidence = IN;
215             remote->awol = 0;
216         }
217         /* Ignore ack if it isn't for most recent message or its from an old view. */
218     } else if (msg->type == MSG) {
219         if (msg->view == comm->gossip.view) {
220             if (msg->min_seqno > remote->incoming.seen_seqno) {
221                 /* Entirely new information */
222                 remote->incoming.seen_seqno = msg->seqno;
223                 remote->incoming.saved_value = msg->value;
224                 remote->incoming.saved_weight = msg->weight;
225                 comm->gossip.value += msg->value;
226                 comm->gossip.weight += msg->weight;
227                 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
228                 remote->awol = 0;
229             } 
230             else if (msg->seqno > remote->incoming.seen_seqno) {
231                 /* Only some of the message is old news. */
232                 double diff_value = msg->value - remote->incoming.saved_value;
233                 double diff_weight = msg->weight - remote->incoming.saved_weight;
234
235                 remote->incoming.seen_seqno = msg->seqno;
236                 remote->incoming.saved_value = msg->value;
237                 remote->incoming.saved_weight = msg->weight;
238
239                 comm->gossip.value += diff_value;
240                 comm->gossip.weight += diff_weight;
241                 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
242                 remote->awol = 0;
243             } 
244             else {
245                 /* The entire message is old news. (Duplicate). */
246                 /* Do nothing. */
247             }
248         } else if (msg->view > comm->gossip.view) {
249             printlog(LOG_DEBUG, "ZK:received message with a newer viewstamp, restarting.\n");
250             comm->restart_function(comm, msg->view);
251             remote->view_confidence = IN;
252
253             remote->incoming.seen_seqno = msg->seqno;
254             remote->incoming.saved_value = msg->value;
255             remote->incoming.saved_weight = msg->weight;
256             comm->gossip.value += msg->value;
257             comm->gossip.weight += msg->weight;
258             send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
259             remote->awol = 0;
260         } else if (msg->view < comm->gossip.view) {
261             printlog(LOG_DEBUG, "ZK:received a message with an older viewstamp.\n");
262             if (remote->view_confidence == IN) {
263                 /* The sender is in the new view and doesn't know it yet. */
264                 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
265             } else if (remote->view_confidence == UNSURE) {
266                 /* We don't know if he's in or not. */
267                 send_ack(id, remote, msg->seqno, UNSUREACK, comm->gossip.view);
268             } else if (remote->view_confidence == NOTIN) {
269                 /* He's out of luck... */
270                 send_ack(id, remote, msg->seqno, NACK, comm->gossip.view);
271             }
272             remote->awol = 0;
273         }
274     } else if (msg->type == UNSUREACK) {
275         /* We received an ack, but the ack sender was unsure whether or not
276          * we'll be a part of its new view.  Can't do much here... */
277         if (msg->view > comm->gossip.view) {
278             remote->view = msg->view;
279             remote->view_confidence = IN;
280             remote->awol = 0;
281             printlog(LOG_DEBUG, "ZK:received an UNSUREACK for view %d\n", msg->view);
282         }
283     } else if (msg->type == NACK) {
284         if (msg->view > comm->gossip.view) {
285             remote->view = msg->view;
286             remote->view_confidence = IN;
287             remote->awol = 0;
288
289             comm->connected = 0;
290             printlog(LOG_DEBUG, "ZK:received a NACK for view %d\n", msg->view);
291         }
292     }
293
294     return 0;
295 }
296
297 int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config) {
298     zkdrlcontext_t *context = NULL;
299     comm->connected = 0;
300
301     if ((context = malloc(sizeof(zkdrlcontext_t))) == NULL) {
302         return ENOMEM;
303     }
304
305     context->zk_host = config->zk_host;
306     context->limiter_lock = &limiter->limiter_lock;
307     context->comm = comm;
308     context->id = id;
309     context->local_addr = limiter->localaddr;
310     comm->membership_state = context;
311
312     printlog(LOG_DEBUG, "ZK: Calling zk init\n");
313
314     context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
315
316     if (context->zkhandle == NULL) {
317         printlog(LOG_CRITICAL, "ZK: docs say that this can fail, but they don't say why. :(  Errno is %d\n", errno);
318         return EINVAL;
319     }
320
321     comm->recv_function = zk_drl_recv;
322     comm->send_function = send_udp_gossip;
323     comm->restart_function = zk_drl_restart;
324
325     return 0;
326 }
327
328 int zk_drl_close(comm_t *comm) {
329     zkdrlcontext_t *context = (zkdrlcontext_t *) comm->membership_state;
330
331     zookeeper_close(context->zkhandle);
332     
333     if (context && context->zk_host) {
334         free(context->zk_host);
335         context->zk_host = NULL;
336     }
337
338     if (context) {
339         free(context);
340         comm->membership_state = NULL;
341     }
342
343     return 0;
344 }
345
346 #endif  /* BUILD_ZOOKEEPER */