#include "ratetypes.h"
#include "drl_state.h"
#include "peer_comm.h"
+#include "swim.h"
#include "logging.h"
+#ifdef BUILD_ZOOKEEPER
+ #include "zk_drl.h"
+#endif
+
extern limiter_t limiter;
+static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) {
+ switch (comm->gossip.membership) {
+ case SWIM:
+ return swim_init(comm, id);
+ break;
+
+#ifdef BUILD_ZOOKEEPER
+
+ case ZOOKEEPER:
+ return zk_drl_init(comm, id, &limiter, config);
+ break;
+
+#endif
+
+ default:
+ printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
+ return EINVAL;
+ }
+}
+
+static void group_membership_teardown(comm_t *comm) {
+ switch (comm->gossip.membership) {
+ case SWIM:
+ swim_teardown(comm);
+ break;
+
+#ifdef BUILD_ZOOKEEPER
+
+ case ZOOKEEPER:
+ zk_drl_close(comm);
+ break;
+
+#endif
+
+ default:
+ printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
+ }
+}
+
+void null_restart_function(comm_t *comm, int32_t view_number) {
+ /* Intentionally empty. */
+}
+
int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
int i;
+ int result = 0;
memset(comm, 0, sizeof(comm_t));
comm->transport_proto = UDP;
comm->remote_node_count = config->peer_count;
comm->gossip.gossip_branch = config->branch;
+ comm->gossip.membership = config->membership;
+ comm->gossip.failure_behavior = config->failure_behavior;
comm->gossip.weight = 1.0;
pthread_mutex_init(&comm->lock, NULL);
-
- /* Set send function. */
+
+ // allocate memory to the indices
+ comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count);
+ memset(comm->indices, 0, sizeof(int)*comm->remote_node_count);
+ for(i = 0; i < comm->remote_node_count; i++)
+ comm->indices[i] = i;
+ comm->shuffle_index = comm->remote_node_count;
+
+ /* Set default comm function pointers. These may get overwritten later
+ * by more specific initialization routines such as group membership
+ * init calls. */
switch (config->commfabric) {
case COMM_MESH:
comm->send_function = send_udp_mesh;
+ comm->recv_function = recv_mesh;
+ comm->restart_function = null_restart_function;
break;
case COMM_GOSSIP:
comm->send_function = send_udp_gossip;
+ comm->recv_function = recv_gossip;
+ comm->restart_function = null_restart_function;
break;
}
}
/* Allocate remote_limiters array and fill it in. Add remotes to map. */
- comm->remote_limiters =
- malloc(config->peer_count * sizeof(remote_limiter_t));
+ comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t));
if (comm->remote_limiters == NULL) {
pthread_mutex_destroy(&comm->lock);
comm->remote_limiters[i].addr = remote_nodes[i].addr;
comm->remote_limiters[i].port = remote_nodes[i].port;
comm->remote_limiters[i].outgoing.next_seqno = 1;
+ comm->remote_limiters[i].reachability = REACHABLE;
+ comm->remote_limiters[i].awol = 0;
+ comm->remote_limiters[i].count_rounds = 0;
+ comm->remote_limiters[i].count_awol = 0;
+ comm->remote_limiters[i].count_alive = 0;
map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
sizeof(remote_node_t), &comm->remote_limiters[i]);
}
-
- /* Allocate and initialize retrys. */
- comm->retrys = malloc(config->branch * sizeof(int));
- if (comm->retrys == NULL) {
+
+ /* Allocate and initialize selected. */
+ comm->selected = malloc(config->branch * sizeof(int));
+ if (comm->selected == NULL) {
pthread_mutex_destroy(&comm->lock);
free_map(comm->remote_node_map, 0);
free(comm->remote_limiters);
}
for (i = 0; i < config->branch; ++i) {
- comm->retrys[i] = -1;
+ comm->selected[i] = -1;
}
- return 0;
+ if (config->commfabric == COMM_GOSSIP) {
+ result = group_membership_init(comm, config->id, config);
+ if (result) {
+ pthread_mutex_destroy(&comm->lock);
+ free_map(comm->remote_node_map, 0);
+ free(comm->remote_limiters);
+ free(comm->selected);
+ }
+ }
+
+ return result;
}
void free_comm(comm_t *comm) {
if (comm) {
+ if (comm->comm_fabric == COMM_GOSSIP) {
+ group_membership_teardown(comm);
+ }
+
if (comm->remote_limiters) {
free(comm->remote_limiters);
}
pthread_mutex_destroy(&comm->lock);
- if (comm->retrys) {
- free(comm->retrys);
+ if (comm->selected) {
+ free(comm->selected);
}
}
}
-int read_comm(comm_t *comm, double *aggregate) {
+int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) {
remote_limiter_t *remote;
pthread_mutex_lock(&comm->lock);
if (comm->comm_fabric == COMM_MESH) {
*aggregate = 0;
+ *effective_global = global_limit;
map_reset_iterate(comm->remote_node_map);
while ((remote = map_next(comm->remote_node_map))) {
- /* remote->rate corresponds to the rate (GRD) or weight (FPS)
- * in generated by the peer remote. */
- *aggregate += remote->rate;
-
- /* If we continue to read it without having heard an update,
- * we start to decay its value. */
- if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
- remote->rate = remote->rate / 2;
+ if (remote->reachability != REACHABLE) {
+ printlog(LOG_WARN, "AWOL remote limiter detected.\n");
+ *effective_global -= (global_limit / (comm->remote_node_count + 1));
} else {
- remote->awol++;
+ /* remote->rate corresponds to the rate (GRD) or weight (FPS)
+ * in generated by the peer remote. */
+ *aggregate += remote->rate;
}
}
*aggregate += comm->local_rate;
} else if (comm->comm_fabric == COMM_GOSSIP) {
double value = 0;
+ int i;
value = (comm->gossip.value / comm->gossip.weight);
value *= (comm->remote_node_count + 1);
- /* Keep around the last value so that we don't stupidly pick 0 when
- * we're negative. If we pick 0, it looks to the limiter like it
- * has free reign and it will take 100% of the rate allocation for
- * itself. */
- if (value <= 0) {
- //*aggregate = comm->gossip.last_nonzero;
- *aggregate = 0;
- //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
- } else {
- *aggregate = value;
- comm->gossip.last_nonzero = *aggregate;
- //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
+ /* Look up the failure handling policy and check to see if it is
+ * is currently relevant. */
+ if (comm->gossip.failure_behavior == PANIC) {
+ int panic = 0;
+ if (!comm->connected) {
+ panic = 1;
+ }
+
+ for (i = 0; i < comm->remote_node_count; ++i) {
+ if (comm->remote_limiters[i].reachability != REACHABLE) {
+ panic = 1;
+ }
+ }
+
+ if (panic) {
+ printlog(LOG_DEBUG, "GOSSIP: Panicking!\n");
+ *aggregate = comm->local_rate;
+ *effective_global = (global_limit / (comm->remote_node_count + 1));
+ } else {
+ *aggregate = (value > 0) ? value : 0;
+ *effective_global = global_limit;
+ }
+ } else if (comm->gossip.failure_behavior == QUORUM) {
+ *effective_global = global_limit;
+ if (comm->connected) {
+ for (i = 0; i < comm->remote_node_count; ++i) {
+ if (comm->remote_limiters[i].reachability != REACHABLE) {
+ *effective_global -= (global_limit / (comm->remote_node_count + 1));
+ }
+ }
+ *aggregate = (value > 0) ? value : 0;
+ } else {
+ /* Not part of the Quorum - do 1/n. */
+ printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n");
+ *aggregate = comm->local_rate;
+ *effective_global = (global_limit / (comm->remote_node_count + 1));
+ }
}
+ printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate);
} else {
printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
comm->comm_fabric);
}
pthread_mutex_unlock(&comm->lock);
- //printf("read: %.3f\n", *aggregate);
-
return 0;
}
if (comm->comm_fabric == COMM_MESH) {
comm->last_local_rate = comm->local_rate;
comm->local_rate = value;
- comm->rate_change = comm->local_rate - comm->last_local_rate;
} else if (comm->comm_fabric == COMM_GOSSIP) {
comm->last_local_rate = comm->local_rate;
comm->local_rate = value;
- comm->rate_change = comm->local_rate - comm->last_local_rate;
- /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
- /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
- comm->gossip.value += comm->rate_change;
+ comm->gossip.value += (comm->local_rate - comm->last_local_rate);
+ printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value);
}
else {
printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
}
void *limiter_receive_thread(void *unused) {
+ printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n");
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGHUP);
+ sigaddset(&signal_mask, SIGUSR1);
+ sigaddset(&signal_mask, SIGUSR2);
+ sigaddset(&signal_mask, SIGRTMAX);
pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);