From: Kevin Webb Date: Sun, 12 Apr 2009 22:57:56 +0000 (+0000) Subject: Fixes & tweaks for sample and hold accounting. X-Git-Tag: DistributedRateLimiting-0.1-0~30 X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=commitdiff_plain;h=762f4e00371f2a8e01a6cf60867f837b90460a60 Fixes & tweaks for sample and hold accounting. Attempting to make gossip detect and handle unreachable nodes. Not stable yet... --- diff --git a/drl/drl_state.c b/drl/drl_state.c index ea68f51..ef57abe 100644 --- a/drl/drl_state.c +++ b/drl/drl_state.c @@ -133,7 +133,7 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) { /* If we continue to read it without having heard an update, * we start to make the peer's value approach decayto, getting * half of the way there each time. */ - if (remote->awol >= REMOTE_AWOL_THRESHOLD) { + if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) { printlog(LOG_WARN, "AWOL remote limiter detected.\n"); remote->rate += ((decayto - remote->rate) / 2); } else { @@ -142,6 +142,8 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) { } *aggregate += comm->local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { + int i; + int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD; double value = 0; value = (comm->gossip.value / comm->gossip.weight); value *= (comm->remote_node_count + 1); @@ -149,15 +151,30 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) { /* Keep around the last value so that we don't stupidly pick 0 when * we're negative. If we pick 0, it looks to the limiter like it * has free reign and it will take 100% of the rate allocation for - * itself. */ + * itself. This is a lie. Open question what to do here... FIXME: Use decayto?*/ if (value <= 0) { //*aggregate = comm->gossip.last_nonzero; *aggregate = 0; - //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); + printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n"); } else { *aggregate = value; comm->gossip.last_nonzero = *aggregate; - //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate); + printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value); + } + + for (i = 0; i < comm->remote_node_count; ++i) { + if (comm->remote_limiters[i].awol == threshold) { + /* Re-claim any value/weight sent. */ + comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value; + comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight; + + comm->remote_limiters[i].outgoing.saved_value = 0.0; + comm->remote_limiters[i].outgoing.saved_weight = 0.0; + + comm->remote_limiters[i].awol += 1; + } else if (comm->remote_limiters[i].awol < threshold) { + comm->remote_limiters[i].awol += 1; + } } } else { printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n", @@ -167,8 +184,6 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) { } pthread_mutex_unlock(&comm->lock); - //printf("read: %.3f\n", *aggregate); - return 0; } diff --git a/drl/drl_state.h b/drl/drl_state.h index 7715a36..950ea7e 100644 --- a/drl/drl_state.h +++ b/drl/drl_state.h @@ -31,7 +31,10 @@ #define MAX_IDENTS (1024) #define MAX_LIMITERS (128) -#define REMOTE_AWOL_THRESHOLD (5) +#define MESH_REMOTE_AWOL_THRESHOLD (5) + +//FIXME: Make this more scientific? +#define GOSSIP_REMOTE_AWOL_THRESHOLD (10 * comm->remote_node_count / comm->gossip.gossip_branch) enum transports { UDP, TCP }; diff --git a/drl/peer_comm.c b/drl/peer_comm.c index 41ed778..2da6737 100644 --- a/drl/peer_comm.c +++ b/drl/peer_comm.c @@ -195,7 +195,6 @@ void limiter_receive() { for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) { if (ident->comm.retrys[i] >= 0 && remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) { - //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]); ident->comm.retrys[i] = -2; } } @@ -210,6 +209,7 @@ void limiter_receive() { ident->comm.gossip.value += msg.value; ident->comm.gossip.weight += msg.weight; send_ack(ident, remote, msg.seqno); + remote->awol = 0; } else if (msg.seqno > remote->incoming.seen_seqno) { /* Only some of the message is old news. */ double diff_value = msg.value - remote->incoming.saved_value; @@ -222,6 +222,7 @@ void limiter_receive() { ident->comm.gossip.value += diff_value; ident->comm.gossip.weight += diff_weight; send_ack(ident, remote, msg.seqno); + remote->awol = 0; } else { /* The entire message is old news. (Duplicate). */ /* Do nothing. */ @@ -457,11 +458,19 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { int i, j, targetid; + int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD; + int rand_count; //HACK... int result = 0; remote_limiter_t *remote; struct sockaddr_in toaddr; double msg_value, msg_weight; + /* This is the factor for the portion of value/weight to keep locally. + * Normally this is 1, meaning that we retain the same amount of value/weight + * that was sent to the peers. In the case of not being able to send to a + * peer though, we increment this to reclaim the value/weight locally. */ + int message_portion = 1; + memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; @@ -476,24 +485,44 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { if (comm->retrys[i] >= 0) { remote = &comm->remote_limiters[comm->retrys[i]]; targetid = comm->retrys[i]; - //printf("%d:d:%d, ", i, comm->retrys[i]); + + if (remote->awol > awol_threshold) { + message_portion += 1; + printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]); + comm->retrys[i] = -1; + continue; + } } else { targetid = -2; + rand_count = 0; - while (targetid == -2) { + while (targetid == -2 && rand_count < 50) { targetid = myrand() % comm->remote_node_count; + rand_count += 1; + /* Don't select an already-used index. */ for (j = 0; j < comm->gossip.gossip_branch; ++j) { - if (targetid == comm->retrys[j]) { + if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) { + printlog(LOG_DEBUG, "Gossip: disqualified targetid %d. retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol); targetid = -2; break; } } } + if (targetid < 0) { + /* Couldn't find a suitable peer to send to... */ + message_portion += 1; + printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n"); + continue; + } else { + printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid); + } + remote = &comm->remote_limiters[targetid]; - //printf("%d:r:%d, ", i, targetid); } + + comm->retrys[i] = targetid; toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; @@ -511,6 +540,15 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { remote->outgoing.saved_value += msg_value; remote->outgoing.saved_weight += msg_weight; +#ifdef ALLOW_PARTITION + + if (do_partition && ((partition_set & (1 << targetid)) == 0)) { + printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid); + continue; + } + +#endif + message_to_nbo(&msg); if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { @@ -518,13 +556,10 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { result = errno; break; } - - comm->retrys[i] = targetid; } - //printf("\n"); - comm->gossip.value = msg_value; - comm->gossip.weight = msg_weight; + comm->gossip.value = msg_value * message_portion; + comm->gossip.weight = msg_weight * message_portion; return result; } diff --git a/drl/samplehold.c b/drl/samplehold.c index 8c566d4..f98e5df 100644 --- a/drl/samplehold.c +++ b/drl/samplehold.c @@ -86,14 +86,14 @@ sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow /* Allocate the backing and give it a little bit extra to deal with variance. */ table->largest = NULL; - table->backing = malloc(sizeof(sampled_flow) * table->capacity * 1.05); + table->backing = malloc(sizeof(sampled_flow) * table->capacity * SAMPLEHOLD_BONUS_FACTOR); if (table->backing == NULL) { free(table); return NULL; } - memset(table->backing, 0, sizeof(sampled_flow) * table->capacity); + memset(table->backing, 0, sizeof(sampled_flow) * table->capacity * SAMPLEHOLD_BONUS_FACTOR); srand(time(NULL)); @@ -279,6 +279,15 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do struct in_addr src, dst; char sip[22], dip[22]; + /* Reset statistics. */ + table->common->num_flows = 0; + table->common->num_flows_5k = 0; + table->common->num_flows_10k = 0; + table->common->num_flows_20k = 0; + table->common->num_flows_50k = 0; + table->common->avg_rate = 0; + /* End statistics. */ + /* Update common aggregate information. */ time_delta = timeval_subtract(now, table->common->last_update); @@ -340,6 +349,22 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do table->largest = &table->backing[i]; } + if (table->backing[i].rate > 51200) { + table->common->num_flows_50k += 1; + table->common->num_flows_20k += 1; + table->common->num_flows_10k += 1; + table->common->num_flows_5k += 1; + } else if (table->backing[i].rate > 20480) { + table->common->num_flows_20k += 1; + table->common->num_flows_10k += 1; + table->common->num_flows_5k += 1; + } else if (table->backing[i].rate > 10240) { + table->common->num_flows_10k += 1; + table->common->num_flows_5k += 1; + } else if (table->backing[i].rate > 5120) { + table->common->num_flows_5k += 1; + } + table->common->num_flows += 1; /* Print debugging info. */ diff --git a/drl/samplehold.h b/drl/samplehold.h index f96fa26..6c992fd 100644 --- a/drl/samplehold.h +++ b/drl/samplehold.h @@ -30,8 +30,11 @@ #define RANDOM_GRANULARITY (1000) -#define SAMPLEHOLD_PERCENTAGE (5) +// FIXME: In reality, you probably don't want this higher than 5 +//#define SAMPLEHOLD_PERCENTAGE (5) +#define SAMPLEHOLD_PERCENTAGE (10) #define SAMPLEHOLD_OVERFACTOR (10) +#define SAMPLEHOLD_BONUS_FACTOR (1.05) /** In-table representation of a flow that has been sampled. */ typedef struct sampled_flow { diff --git a/drl/standard.c b/drl/standard.c index 9c996e3..103a23c 100644 --- a/drl/standard.c +++ b/drl/standard.c @@ -314,22 +314,18 @@ void standard_table_update_flows(standard_flow_table table, struct timeval now, table->common->num_flows_20k += 1; table->common->num_flows_10k += 1; table->common->num_flows_5k += 1; - table->common->num_flows += 1; } else if (current->rate > 20480) { table->common->num_flows_20k += 1; table->common->num_flows_10k += 1; table->common->num_flows_5k += 1; - table->common->num_flows += 1; } else if (current->rate > 10240) { table->common->num_flows_10k += 1; table->common->num_flows_5k += 1; - table->common->num_flows += 1; } else if (current->rate > 5120) { table->common->num_flows_5k += 1; - table->common->num_flows += 1; - } else { - table->common->num_flows += 1; - } + } + + table->common->num_flows += 1; src.s_addr = ntohl(current->source_ip); dst.s_addr = ntohl(current->dest_ip);