Fixes & tweaks for sample and hold accounting.
[distributedratelimiting.git] / drl / drl_state.c
index ea68f51..ef57abe 100644 (file)
@@ -133,7 +133,7 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
             /* If we continue to read it without having heard an update,
              * we start to make the peer's value approach decayto, getting
              * half of the way there each time. */
-            if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
+            if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) {
                 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
                 remote->rate += ((decayto - remote->rate) / 2);
             } else {
@@ -142,6 +142,8 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
         }
         *aggregate += comm->local_rate;
     } else if (comm->comm_fabric == COMM_GOSSIP) {
+        int i;
+        int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
         double value = 0;
         value = (comm->gossip.value / comm->gossip.weight);
         value *= (comm->remote_node_count + 1);
@@ -149,15 +151,30 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
         /* Keep around the last value so that we don't stupidly pick 0 when
          * we're negative.  If we pick 0, it looks to the limiter like it
          * has free reign and it will take 100% of the rate allocation for
-         * itself. */
+         * itself. This is a lie.  Open question what to do here... FIXME: Use decayto?*/
         if (value <= 0) {
             //*aggregate = comm->gossip.last_nonzero;
             *aggregate = 0;
-            //printf("*****Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
+            printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n");
         } else {
             *aggregate = value;
             comm->gossip.last_nonzero = *aggregate;
-            //printf("Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
+            printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value);
+        }
+
+        for (i = 0; i < comm->remote_node_count; ++i) {
+            if (comm->remote_limiters[i].awol == threshold) {
+                /* Re-claim any value/weight sent. */
+                comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value;
+                comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight;
+
+                comm->remote_limiters[i].outgoing.saved_value = 0.0;
+                comm->remote_limiters[i].outgoing.saved_weight = 0.0;
+
+                comm->remote_limiters[i].awol += 1;
+            } else if (comm->remote_limiters[i].awol < threshold) {
+                comm->remote_limiters[i].awol += 1;
+            }
         }
     } else {
         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
@@ -167,8 +184,6 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
     }
     pthread_mutex_unlock(&comm->lock);
 
-    //printf("read: %.3f\n", *aggregate);
-
     return 0;
 }