Remapped signals and added support for drl to reopen logfiles.
[distributedratelimiting.git] / drl / drl_state.c
index eba79ba..339c109 100644 (file)
 #include "ratetypes.h"
 #include "drl_state.h"
 #include "peer_comm.h"
+#include "swim.h"
 #include "logging.h"
 
+#ifdef BUILD_ZOOKEEPER
+    #include "zk_drl.h"
+#endif
+
 extern limiter_t limiter;
 
+static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) {
+    switch (comm->gossip.membership) {
+        case SWIM:
+            return swim_init(comm, id);
+        break;
+
+#ifdef BUILD_ZOOKEEPER
+
+        case ZOOKEEPER:
+            return zk_drl_init(comm, id, &limiter, config);
+        break;
+
+#endif
+
+        default:
+            printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
+            return EINVAL;
+    }
+}
+
+static void group_membership_teardown(comm_t *comm) {
+    switch (comm->gossip.membership) {
+        case SWIM:
+            swim_teardown(comm);
+        break;
+
+#ifdef BUILD_ZOOKEEPER
+
+        case ZOOKEEPER:
+            zk_drl_close(comm);
+        break;
+
+#endif
+
+        default:
+            printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
+    }
+}
+
+void null_restart_function(comm_t *comm, int32_t view_number) {
+    /* Intentionally empty. */
+}
+
 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
     int i;
+    int result = 0;
 
     memset(comm, 0, sizeof(comm_t));
 
@@ -40,17 +89,32 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
     comm->transport_proto = UDP;
     comm->remote_node_count = config->peer_count;
     comm->gossip.gossip_branch = config->branch;
+    comm->gossip.membership = config->membership;
+    comm->gossip.failure_behavior = config->failure_behavior;
     comm->gossip.weight = 1.0;
 
     pthread_mutex_init(&comm->lock, NULL);
-
-    /* Set send function. */
+    
+    // allocate memory to the indices
+    comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count);
+    memset(comm->indices, 0, sizeof(int)*comm->remote_node_count);
+    for(i = 0; i < comm->remote_node_count; i++)
+        comm->indices[i] = i;
+    comm->shuffle_index = comm->remote_node_count;
+
+    /* Set default comm function pointers. These may get overwritten later
+     * by more specific initialization routines such as group membership
+     * init calls. */
     switch (config->commfabric) {
         case COMM_MESH:
             comm->send_function = send_udp_mesh;
+            comm->recv_function = recv_mesh;
+            comm->restart_function = null_restart_function;
             break;
         case COMM_GOSSIP:
             comm->send_function = send_udp_gossip;
+            comm->recv_function = recv_gossip;
+            comm->restart_function = null_restart_function;
             break;
     }
 
@@ -61,8 +125,7 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
     }
 
     /* Allocate remote_limiters array and fill it in. Add remotes to map. */
-    comm->remote_limiters =
-                        malloc(config->peer_count * sizeof(remote_limiter_t));
+    comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t));
 
     if (comm->remote_limiters == NULL) {
         pthread_mutex_destroy(&comm->lock);
@@ -76,13 +139,18 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
         comm->remote_limiters[i].addr = remote_nodes[i].addr;
         comm->remote_limiters[i].port = remote_nodes[i].port;
         comm->remote_limiters[i].outgoing.next_seqno = 1;
+        comm->remote_limiters[i].reachability = REACHABLE;
+        comm->remote_limiters[i].awol = 0;
+        comm->remote_limiters[i].count_rounds = 0;
+        comm->remote_limiters[i].count_awol = 0;
+        comm->remote_limiters[i].count_alive = 0;
         map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
                    sizeof(remote_node_t), &comm->remote_limiters[i]);
     }
-
-    /* Allocate and initialize retrys. */
-    comm->retrys = malloc(config->branch * sizeof(int));
-    if (comm->retrys == NULL) {
+   
+    /* Allocate and initialize selected. */
+    comm->selected = malloc(config->branch * sizeof(int));
+    if (comm->selected == NULL) {
         pthread_mutex_destroy(&comm->lock);
         free_map(comm->remote_node_map, 0);
         free(comm->remote_limiters);
@@ -90,14 +158,28 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
     }
 
     for (i = 0; i < config->branch; ++i) {
-        comm->retrys[i] = -1;
+        comm->selected[i] = -1;
     }
 
-    return 0;
+    if (config->commfabric == COMM_GOSSIP) {
+        result = group_membership_init(comm, config->id, config);
+        if (result) {
+            pthread_mutex_destroy(&comm->lock);
+            free_map(comm->remote_node_map, 0);
+            free(comm->remote_limiters);
+            free(comm->selected);
+        }
+    }
+
+    return result;
 }
 
 void free_comm(comm_t *comm) {
     if (comm) {
+        if (comm->comm_fabric == COMM_GOSSIP) {
+            group_membership_teardown(comm);
+        }
+
         if (comm->remote_limiters) {
             free(comm->remote_limiters);
         }
@@ -112,53 +194,76 @@ void free_comm(comm_t *comm) {
 
         pthread_mutex_destroy(&comm->lock);
 
-        if (comm->retrys) {
-            free(comm->retrys);
+        if (comm->selected) {
+            free(comm->selected);
         }
     }
 }
 
-int read_comm(comm_t *comm, double *aggregate, double decayto) {
+int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) {
     remote_limiter_t *remote;
 
     pthread_mutex_lock(&comm->lock);
     if (comm->comm_fabric == COMM_MESH) {
         *aggregate = 0;
+        *effective_global = global_limit;
         map_reset_iterate(comm->remote_node_map);
         while ((remote = map_next(comm->remote_node_map))) {
-            /* remote->rate corresponds to the rate (GRD) or weight (FPS)
-             * in generated by the peer remote. */
-            *aggregate += remote->rate;
-
-            /* If we continue to read it without having heard an update,
-             * we start to make the peer's value approach decayto, getting
-             * half of the way there each time. */
-            if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
+            if (remote->reachability != REACHABLE) {
                 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
-                remote->rate += ((decayto - remote->rate) / 2);
+                *effective_global -= (global_limit / (comm->remote_node_count + 1));
             } else {
-                remote->awol++;
+                /* remote->rate corresponds to the rate (GRD) or weight (FPS)
+                 * in generated by the peer remote. */
+                *aggregate += remote->rate;
             }
         }
         *aggregate += comm->local_rate;
     } else if (comm->comm_fabric == COMM_GOSSIP) {
         double value = 0;
+        int i;
         value = (comm->gossip.value / comm->gossip.weight);
         value *= (comm->remote_node_count + 1);
 
-        /* Keep around the last value so that we don't stupidly pick 0 when
-         * we're negative.  If we pick 0, it looks to the limiter like it
-         * has free reign and it will take 100% of the rate allocation for
-         * itself. */
-        if (value <= 0) {
-            //*aggregate = comm->gossip.last_nonzero;
-            *aggregate = 0;
-            //printf("*****Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
-        } else {
-            *aggregate = value;
-            comm->gossip.last_nonzero = *aggregate;
-            //printf("Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
+        /* Look up the failure handling policy and check to see if it is
+         * is currently relevant. */
+        if (comm->gossip.failure_behavior == PANIC) {
+            int panic = 0;
+            if (!comm->connected) {
+                panic = 1;
+            }
+
+            for (i = 0; i < comm->remote_node_count; ++i) {
+                if (comm->remote_limiters[i].reachability != REACHABLE) {
+                    panic = 1;
+                }
+            }
+
+            if (panic) {
+                printlog(LOG_DEBUG, "GOSSIP: Panicking!\n");
+                *aggregate = comm->local_rate;
+                *effective_global = (global_limit / (comm->remote_node_count + 1));
+            } else {
+                *aggregate = (value > 0) ? value : 0;
+                *effective_global = global_limit;
+            }
+        } else if (comm->gossip.failure_behavior == QUORUM) {
+            *effective_global = global_limit;
+            if (comm->connected) {
+                for (i = 0; i < comm->remote_node_count; ++i) {
+                    if (comm->remote_limiters[i].reachability != REACHABLE) {
+                        *effective_global -= (global_limit / (comm->remote_node_count + 1));
+                    }
+                }
+                *aggregate = (value > 0) ? value : 0;
+            } else {
+                /* Not part of the Quorum - do 1/n. */
+                printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n");
+                *aggregate = comm->local_rate;
+                *effective_global = (global_limit / (comm->remote_node_count + 1));
+            }
         }
+        printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate);
     } else {
         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
                  comm->comm_fabric);
@@ -167,8 +272,6 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
     }
     pthread_mutex_unlock(&comm->lock);
 
-    //printf("read: %.3f\n", *aggregate);
-
     return 0;
 }
 
@@ -177,14 +280,11 @@ int write_local_value(comm_t *comm, const double value) {
     if (comm->comm_fabric == COMM_MESH) {
         comm->last_local_rate = comm->local_rate;
         comm->local_rate = value;
-        comm->rate_change = comm->local_rate - comm->last_local_rate;
     } else if (comm->comm_fabric == COMM_GOSSIP) {
         comm->last_local_rate = comm->local_rate;
         comm->local_rate = value;
-        comm->rate_change = comm->local_rate - comm->last_local_rate;
-        /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
-        /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
-        comm->gossip.value += comm->rate_change;
+        comm->gossip.value += (comm->local_rate - comm->last_local_rate);
+        printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value);
     }
     else {
         printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
@@ -210,10 +310,14 @@ int send_update(comm_t *comm, uint32_t id) {
 }
 
 void *limiter_receive_thread(void *unused) {
+    printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n");
     sigset_t signal_mask;
 
     sigemptyset(&signal_mask);
     sigaddset(&signal_mask, SIGHUP);
+    sigaddset(&signal_mask, SIGUSR1);
+    sigaddset(&signal_mask, SIGUSR2);
+    sigaddset(&signal_mask, SIGRTMAX);
     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
 
     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);