X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Fdrl_state.c;h=ef57abe0085952a1aef12b2b2d9dadf8c91c2b74;hp=ea68f511e2b11684be17bc4f8e1a707acbce5167;hb=762f4e00371f2a8e01a6cf60867f837b90460a60;hpb=f83340496f632165030cc92cd98408a87082f6b1 diff --git a/drl/drl_state.c b/drl/drl_state.c index ea68f51..ef57abe 100644 --- a/drl/drl_state.c +++ b/drl/drl_state.c @@ -133,7 +133,7 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) { /* If we continue to read it without having heard an update, * we start to make the peer's value approach decayto, getting * half of the way there each time. */ - if (remote->awol >= REMOTE_AWOL_THRESHOLD) { + if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) { printlog(LOG_WARN, "AWOL remote limiter detected.\n"); remote->rate += ((decayto - remote->rate) / 2); } else { @@ -142,6 +142,8 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) { } *aggregate += comm->local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { + int i; + int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD; double value = 0; value = (comm->gossip.value / comm->gossip.weight); value *= (comm->remote_node_count + 1); @@ -149,15 +151,30 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) { /* Keep around the last value so that we don't stupidly pick 0 when * we're negative. If we pick 0, it looks to the limiter like it * has free reign and it will take 100% of the rate allocation for - * itself. */ + * itself. This is a lie. Open question what to do here... FIXME: Use decayto?*/ if (value <= 0) { //*aggregate = comm->gossip.last_nonzero; *aggregate = 0; - //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); + printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n"); } else { *aggregate = value; comm->gossip.last_nonzero = *aggregate; - //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); + printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value); + } + + for (i = 0; i < comm->remote_node_count; ++i) { + if (comm->remote_limiters[i].awol == threshold) { + /* Re-claim any value/weight sent. */ + comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value; + comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight; + + comm->remote_limiters[i].outgoing.saved_value = 0.0; + comm->remote_limiters[i].outgoing.saved_weight = 0.0; + + comm->remote_limiters[i].awol += 1; + } else if (comm->remote_limiters[i].awol < threshold) { + comm->remote_limiters[i].awol += 1; + } } } else { printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n", @@ -167,8 +184,6 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) { } pthread_mutex_unlock(&comm->lock); - //printf("read: %.3f\n", *aggregate); - return 0; }