Updates to autotools for library detection
[distributedratelimiting.git] / drl / drl_state.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /** Allows us to use pthread_rwlocks. */
4 #define _XOPEN_SOURCE 600
5
6 /* malloc(), NULL */
7 #include <stdlib.h>
8
9 /* getpid() */
10 #include <unistd.h>
11
12 /* Socket functions. */
13 #include <sys/socket.h>
14
15 /* memset() */
16 #include <string.h>
17
18 /* perror() */
19 #include <errno.h>
20
21 /* FD_ZERO() */
22 #include <sys/select.h>
23
24 #include <assert.h>
25
26 #include "raterouter.h"
27 #include "ratetypes.h"
28 #include "drl_state.h"
29 #include "peer_comm.h"
30 #include "swim.h"
31 #include "logging.h"
32
33 #ifdef BUILD_ZOOKEEPER
34     #include "zk_drl.h"
35 #endif
36
37 extern limiter_t limiter;
38
39 static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) {
40     switch (comm->gossip.membership) {
41         case SWIM:
42             return swim_init(comm, id);
43         break;
44
45 #ifdef BUILD_ZOOKEEPER
46
47         case ZOOKEEPER:
48             return zk_drl_init(comm, id, &limiter, config);
49         break;
50
51 #endif
52
53         default:
54             printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
55             return EINVAL;
56     }
57 }
58
59 static void group_membership_teardown(comm_t *comm) {
60     switch (comm->gossip.membership) {
61         case SWIM:
62             swim_teardown(comm);
63         break;
64
65 #ifdef BUILD_ZOOKEEPER
66
67         case ZOOKEEPER:
68             zk_drl_close(comm);
69         break;
70
71 #endif
72
73         default:
74             printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
75     }
76 }
77
78 void null_restart_function(comm_t *comm, int32_t view_number) {
79     /* Intentionally empty. */
80 }
81
82 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
83     int i;
84     int result = 0;
85
86     memset(comm, 0, sizeof(comm_t));
87
88     comm->comm_fabric = config->commfabric;
89     comm->transport_proto = UDP;
90     comm->remote_node_count = config->peer_count;
91     comm->gossip.gossip_branch = config->branch;
92     comm->gossip.membership = config->membership;
93     comm->gossip.failure_behavior = config->failure_behavior;
94     comm->gossip.weight = 1.0;
95
96     pthread_mutex_init(&comm->lock, NULL);
97     
98     // allocate memory to the indices
99     comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count);
100     memset(comm->indices, 0, sizeof(int)*comm->remote_node_count);
101     for(i = 0; i < comm->remote_node_count; i++)
102         comm->indices[i] = i;
103     comm->shuffle_index = comm->remote_node_count;
104
105     /* Set default comm function pointers. These may get overwritten later
106      * by more specific initialization routines such as group membership
107      * init calls. */
108     switch (config->commfabric) {
109         case COMM_MESH:
110             comm->send_function = send_udp_mesh;
111             comm->recv_function = recv_mesh;
112             comm->restart_function = null_restart_function;
113             break;
114         case COMM_GOSSIP:
115             comm->send_function = send_udp_gossip;
116             comm->recv_function = recv_gossip;
117             comm->restart_function = null_restart_function;
118             break;
119     }
120
121     comm->remote_node_map = allocate_map();
122     if (comm->remote_node_map == NULL) {
123         pthread_mutex_destroy(&comm->lock);
124         return ENOMEM;
125     }
126
127     /* Allocate remote_limiters array and fill it in. Add remotes to map. */
128     comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t));
129
130     if (comm->remote_limiters == NULL) {
131         pthread_mutex_destroy(&comm->lock);
132         free_map(comm->remote_node_map, 0);
133         return ENOMEM;
134     }
135
136     memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
137
138     for (i = 0; i < config->peer_count; ++i) {
139         comm->remote_limiters[i].addr = remote_nodes[i].addr;
140         comm->remote_limiters[i].port = remote_nodes[i].port;
141         comm->remote_limiters[i].outgoing.next_seqno = 1;
142         comm->remote_limiters[i].reachability = REACHABLE;
143         comm->remote_limiters[i].awol = 0;
144         comm->remote_limiters[i].count_rounds = 0;
145         comm->remote_limiters[i].count_awol = 0;
146         comm->remote_limiters[i].count_alive = 0;
147         map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
148                    sizeof(remote_node_t), &comm->remote_limiters[i]);
149     }
150    
151     /* Allocate and initialize selected. */
152     comm->selected = malloc(config->branch * sizeof(int));
153     if (comm->selected == NULL) {
154         pthread_mutex_destroy(&comm->lock);
155         free_map(comm->remote_node_map, 0);
156         free(comm->remote_limiters);
157         return ENOMEM;
158     }
159
160     for (i = 0; i < config->branch; ++i) {
161         comm->selected[i] = -1;
162     }
163
164     if (config->commfabric == COMM_GOSSIP) {
165         result = group_membership_init(comm, config->id, config);
166         if (result) {
167             pthread_mutex_destroy(&comm->lock);
168             free_map(comm->remote_node_map, 0);
169             free(comm->remote_limiters);
170             free(comm->selected);
171         }
172     }
173
174     return result;
175 }
176
177 void free_comm(comm_t *comm) {
178     if (comm) {
179         if (comm->comm_fabric == COMM_GOSSIP) {
180             group_membership_teardown(comm);
181         }
182
183         if (comm->remote_limiters) {
184             free(comm->remote_limiters);
185         }
186
187         if (comm->remote_nodes) {
188             free(comm->remote_nodes);
189         }
190
191         if (comm->remote_node_map) {
192             free_map(comm->remote_node_map, 0);
193         }
194
195         pthread_mutex_destroy(&comm->lock);
196
197         if (comm->selected) {
198             free(comm->selected);
199         }
200     }
201 }
202
203 int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) {
204     remote_limiter_t *remote;
205
206     pthread_mutex_lock(&comm->lock);
207     if (comm->comm_fabric == COMM_MESH) {
208         *aggregate = 0;
209         *effective_global = global_limit;
210         map_reset_iterate(comm->remote_node_map);
211         while ((remote = map_next(comm->remote_node_map))) {
212             if (remote->reachability != REACHABLE) {
213                 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
214                 *effective_global -= (global_limit / (comm->remote_node_count + 1));
215             } else {
216                 /* remote->rate corresponds to the rate (GRD) or weight (FPS)
217                  * in generated by the peer remote. */
218                 *aggregate += remote->rate;
219             }
220         }
221         *aggregate += comm->local_rate;
222     } else if (comm->comm_fabric == COMM_GOSSIP) {
223         double value = 0;
224         int i;
225         value = (comm->gossip.value / comm->gossip.weight);
226         value *= (comm->remote_node_count + 1);
227
228         /* Look up the failure handling policy and check to see if it is
229          * is currently relevant. */
230         if (comm->gossip.failure_behavior == PANIC) {
231             int panic = 0;
232             if (!comm->connected) {
233                 panic = 1;
234             }
235
236             for (i = 0; i < comm->remote_node_count; ++i) {
237                 if (comm->remote_limiters[i].reachability != REACHABLE) {
238                     panic = 1;
239                 }
240             }
241
242             if (panic) {
243                 printlog(LOG_DEBUG, "GOSSIP: Panicking!\n");
244                 *aggregate = comm->local_rate;
245                 *effective_global = (global_limit / (comm->remote_node_count + 1));
246             } else {
247                 *aggregate = (value > 0) ? value : 0;
248                 *effective_global = global_limit;
249             }
250         } else if (comm->gossip.failure_behavior == QUORUM) {
251             *effective_global = global_limit;
252             if (comm->connected) {
253                 for (i = 0; i < comm->remote_node_count; ++i) {
254                     if (comm->remote_limiters[i].reachability != REACHABLE) {
255                         *effective_global -= (global_limit / (comm->remote_node_count + 1));
256                     }
257                 }
258                 *aggregate = (value > 0) ? value : 0;
259             } else {
260                 /* Not part of the Quorum - do 1/n. */
261                 printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n");
262                 *aggregate = comm->local_rate;
263                 *effective_global = (global_limit / (comm->remote_node_count + 1));
264             }
265         }
266         printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate);
267     } else {
268         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
269                  comm->comm_fabric);
270         pthread_mutex_unlock(&comm->lock);
271         return EINVAL;
272     }
273     pthread_mutex_unlock(&comm->lock);
274
275     return 0;
276 }
277
278 int write_local_value(comm_t *comm, const double value) {
279     pthread_mutex_lock(&comm->lock);
280     if (comm->comm_fabric == COMM_MESH) {
281         comm->last_local_rate = comm->local_rate;
282         comm->local_rate = value;
283     } else if (comm->comm_fabric == COMM_GOSSIP) {
284         comm->last_local_rate = comm->local_rate;
285         comm->local_rate = value;
286         comm->gossip.value += (comm->local_rate - comm->last_local_rate);
287         printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value);
288     }
289     else {
290         printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
291                  comm->comm_fabric);
292         pthread_mutex_unlock(&comm->lock);
293         return EINVAL;
294     }
295     pthread_mutex_unlock(&comm->lock);
296
297     return 0;
298 }
299
300 int send_update(comm_t *comm, uint32_t id) {
301     int result = 0;
302
303     pthread_mutex_lock(&comm->lock);
304
305     result = comm->send_function(comm, id, limiter.udp_socket);
306
307     pthread_mutex_unlock(&comm->lock);
308
309     return result;
310 }
311
312 void *limiter_receive_thread(void *unused) {
313     printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n");
314     sigset_t signal_mask;
315
316     sigemptyset(&signal_mask);
317     sigaddset(&signal_mask, SIGHUP);
318     sigaddset(&signal_mask, SIGUSR1);
319     sigaddset(&signal_mask, SIGUSR2);
320     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
321
322     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
323     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
324     while (1) {
325         limiter_receive();
326     }
327     pthread_exit(NULL);
328 }