X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Fpeer_comm.c;h=2da6737cb3886319bfe85aa3b25c7206e90a5713;hp=41ed778bdaf6bbe166035cab8d6c6bcc33e2aae8;hb=762f4e00371f2a8e01a6cf60867f837b90460a60;hpb=f83340496f632165030cc92cd98408a87082f6b1 diff --git a/drl/peer_comm.c b/drl/peer_comm.c index 41ed778..2da6737 100644 --- a/drl/peer_comm.c +++ b/drl/peer_comm.c @@ -195,7 +195,6 @@ void limiter_receive() { for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) { if (ident->comm.retrys[i] >= 0 && remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) { - //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]); ident->comm.retrys[i] = -2; } } @@ -210,6 +209,7 @@ void limiter_receive() { ident->comm.gossip.value += msg.value; ident->comm.gossip.weight += msg.weight; send_ack(ident, remote, msg.seqno); + remote->awol = 0; } else if (msg.seqno > remote->incoming.seen_seqno) { /* Only some of the message is old news. */ double diff_value = msg.value - remote->incoming.saved_value; @@ -222,6 +222,7 @@ void limiter_receive() { ident->comm.gossip.value += diff_value; ident->comm.gossip.weight += diff_weight; send_ack(ident, remote, msg.seqno); + remote->awol = 0; } else { /* The entire message is old news. (Duplicate). */ /* Do nothing. */ @@ -457,11 +458,19 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { int i, j, targetid; + int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD; + int rand_count; //HACK... int result = 0; remote_limiter_t *remote; struct sockaddr_in toaddr; double msg_value, msg_weight; + /* This is the factor for the portion of value/weight to keep locally. + * Normally this is 1, meaning that we retain the same amount of value/weight + * that was sent to the peers. In the case of not being able to send to a + * peer though, we increment this to reclaim the value/weight locally. */ + int message_portion = 1; + memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; @@ -476,24 +485,44 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { if (comm->retrys[i] >= 0) { remote = &comm->remote_limiters[comm->retrys[i]]; targetid = comm->retrys[i]; - //printf("%d:d:%d, ", i, comm->retrys[i]); + + if (remote->awol > awol_threshold) { + message_portion += 1; + printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]); + comm->retrys[i] = -1; + continue; + } } else { targetid = -2; + rand_count = 0; - while (targetid == -2) { + while (targetid == -2 && rand_count < 50) { targetid = myrand() % comm->remote_node_count; + rand_count += 1; + /* Don't select an already-used index. */ for (j = 0; j < comm->gossip.gossip_branch; ++j) { - if (targetid == comm->retrys[j]) { + if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) { + printlog(LOG_DEBUG, "Gossip: disqualified targetid %d. retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol); targetid = -2; break; } } } + if (targetid < 0) { + /* Couldn't find a suitable peer to send to... */ + message_portion += 1; + printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n"); + continue; + } else { + printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid); + } + remote = &comm->remote_limiters[targetid]; - //printf("%d:r:%d, ", i, targetid); } + + comm->retrys[i] = targetid; toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; @@ -511,6 +540,15 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { remote->outgoing.saved_value += msg_value; remote->outgoing.saved_weight += msg_weight; +#ifdef ALLOW_PARTITION + + if (do_partition && ((partition_set & (1 << targetid)) == 0)) { + printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid); + continue; + } + +#endif + message_to_nbo(&msg); if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { @@ -518,13 +556,10 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { result = errno; break; } - - comm->retrys[i] = targetid; } - //printf("\n"); - comm->gossip.value = msg_value; - comm->gossip.weight = msg_weight; + comm->gossip.value = msg_value * message_portion; + comm->gossip.weight = msg_weight * message_portion; return result; }