1 /* See the DRL-LICENSE file for this file's software license. */
3 /** Allows us to use pthread_rwlocks. */
4 #define _XOPEN_SOURCE 600
12 /* Socket functions. */
13 #include <sys/socket.h>
22 #include <sys/select.h>
26 #include "raterouter.h"
27 #include "ratetypes.h"
28 #include "drl_state.h"
29 #include "peer_comm.h"
33 #ifdef BUILD_ZOOKEEPER
37 extern limiter_t limiter;
39 static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) {
40 switch (comm->gossip.membership) {
42 return swim_init(comm, id);
45 #ifdef BUILD_ZOOKEEPER
48 return zk_drl_init(comm, id, &limiter, config);
54 printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
59 static void group_membership_teardown(comm_t *comm) {
60 switch (comm->gossip.membership) {
65 #ifdef BUILD_ZOOKEEPER
74 printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
78 void null_restart_function(comm_t *comm, int32_t view_number) {
79 /* Intentionally empty. */
82 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
86 memset(comm, 0, sizeof(comm_t));
88 comm->comm_fabric = config->commfabric;
89 comm->transport_proto = UDP;
90 comm->remote_node_count = config->peer_count;
91 comm->gossip.gossip_branch = config->branch;
92 comm->gossip.membership = config->membership;
93 comm->gossip.failure_behavior = config->failure_behavior;
94 comm->gossip.weight = 1.0;
96 pthread_mutex_init(&comm->lock, NULL);
98 // allocate memory to the indices
99 comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count);
100 memset(comm->indices, 0, sizeof(int)*comm->remote_node_count);
101 for(i = 0; i < comm->remote_node_count; i++)
102 comm->indices[i] = i;
103 comm->shuffle_index = comm->remote_node_count;
105 /* Set default comm function pointers. These may get overwritten later
106 * by more specific initialization routines such as group membership
108 switch (config->commfabric) {
110 comm->send_function = send_udp_mesh;
111 comm->recv_function = recv_mesh;
112 comm->restart_function = null_restart_function;
115 comm->send_function = send_udp_gossip;
116 comm->recv_function = recv_gossip;
117 comm->restart_function = null_restart_function;
121 comm->remote_node_map = allocate_map();
122 if (comm->remote_node_map == NULL) {
123 pthread_mutex_destroy(&comm->lock);
127 /* Allocate remote_limiters array and fill it in. Add remotes to map. */
128 comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t));
130 if (comm->remote_limiters == NULL) {
131 pthread_mutex_destroy(&comm->lock);
132 free_map(comm->remote_node_map, 0);
136 memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
138 for (i = 0; i < config->peer_count; ++i) {
139 comm->remote_limiters[i].addr = remote_nodes[i].addr;
140 comm->remote_limiters[i].port = remote_nodes[i].port;
141 comm->remote_limiters[i].outgoing.next_seqno = 1;
142 comm->remote_limiters[i].reachability = REACHABLE;
143 comm->remote_limiters[i].awol = 0;
144 comm->remote_limiters[i].count_rounds = 0;
145 comm->remote_limiters[i].count_awol = 0;
146 comm->remote_limiters[i].count_alive = 0;
147 map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
148 sizeof(remote_node_t), &comm->remote_limiters[i]);
151 /* Allocate and initialize selected. */
152 comm->selected = malloc(config->branch * sizeof(int));
153 if (comm->selected == NULL) {
154 pthread_mutex_destroy(&comm->lock);
155 free_map(comm->remote_node_map, 0);
156 free(comm->remote_limiters);
160 for (i = 0; i < config->branch; ++i) {
161 comm->selected[i] = -1;
164 if (config->commfabric == COMM_GOSSIP) {
165 result = group_membership_init(comm, config->id, config);
167 pthread_mutex_destroy(&comm->lock);
168 free_map(comm->remote_node_map, 0);
169 free(comm->remote_limiters);
170 free(comm->selected);
177 void free_comm(comm_t *comm) {
179 if (comm->comm_fabric == COMM_GOSSIP) {
180 group_membership_teardown(comm);
183 if (comm->remote_limiters) {
184 free(comm->remote_limiters);
187 if (comm->remote_nodes) {
188 free(comm->remote_nodes);
191 if (comm->remote_node_map) {
192 free_map(comm->remote_node_map, 0);
195 pthread_mutex_destroy(&comm->lock);
197 if (comm->selected) {
198 free(comm->selected);
203 int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) {
204 remote_limiter_t *remote;
206 pthread_mutex_lock(&comm->lock);
207 if (comm->comm_fabric == COMM_MESH) {
209 *effective_global = global_limit;
210 map_reset_iterate(comm->remote_node_map);
211 while ((remote = map_next(comm->remote_node_map))) {
212 if (remote->reachability != REACHABLE) {
213 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
214 *effective_global -= (global_limit / (comm->remote_node_count + 1));
216 /* remote->rate corresponds to the rate (GRD) or weight (FPS)
217 * in generated by the peer remote. */
218 *aggregate += remote->rate;
221 *aggregate += comm->local_rate;
222 } else if (comm->comm_fabric == COMM_GOSSIP) {
225 value = (comm->gossip.value / comm->gossip.weight);
226 value *= (comm->remote_node_count + 1);
228 /* Look up the failure handling policy and check to see if it is
229 * is currently relevant. */
230 if (comm->gossip.failure_behavior == PANIC) {
232 if (!comm->connected) {
236 for (i = 0; i < comm->remote_node_count; ++i) {
237 if (comm->remote_limiters[i].reachability != REACHABLE) {
243 printlog(LOG_DEBUG, "GOSSIP: Panicking!\n");
244 *aggregate = comm->local_rate;
245 *effective_global = (global_limit / (comm->remote_node_count + 1));
247 *aggregate = (value > 0) ? value : 0;
248 *effective_global = global_limit;
250 } else if (comm->gossip.failure_behavior == QUORUM) {
251 *effective_global = global_limit;
252 if (comm->connected) {
253 for (i = 0; i < comm->remote_node_count; ++i) {
254 if (comm->remote_limiters[i].reachability != REACHABLE) {
255 *effective_global -= (global_limit / (comm->remote_node_count + 1));
258 *aggregate = (value > 0) ? value : 0;
260 /* Not part of the Quorum - do 1/n. */
261 printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n");
262 *aggregate = comm->local_rate;
263 *effective_global = (global_limit / (comm->remote_node_count + 1));
266 printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate);
268 printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
270 pthread_mutex_unlock(&comm->lock);
273 pthread_mutex_unlock(&comm->lock);
278 int write_local_value(comm_t *comm, const double value) {
279 pthread_mutex_lock(&comm->lock);
280 if (comm->comm_fabric == COMM_MESH) {
281 comm->last_local_rate = comm->local_rate;
282 comm->local_rate = value;
283 } else if (comm->comm_fabric == COMM_GOSSIP) {
284 comm->last_local_rate = comm->local_rate;
285 comm->local_rate = value;
286 comm->gossip.value += (comm->local_rate - comm->last_local_rate);
287 printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value);
290 printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
292 pthread_mutex_unlock(&comm->lock);
295 pthread_mutex_unlock(&comm->lock);
300 int send_update(comm_t *comm, uint32_t id) {
303 pthread_mutex_lock(&comm->lock);
305 result = comm->send_function(comm, id, limiter.udp_socket);
307 pthread_mutex_unlock(&comm->lock);
312 void *limiter_receive_thread(void *unused) {
313 printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n");
314 sigset_t signal_mask;
316 sigemptyset(&signal_mask);
317 sigaddset(&signal_mask, SIGHUP);
318 sigaddset(&signal_mask, SIGUSR1);
319 sigaddset(&signal_mask, SIGUSR2);
320 sigaddset(&signal_mask, SIGRTMAX);
321 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
323 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
324 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);