Attempting to make gossip detect and handle unreachable nodes. Not stable yet...
/* If we continue to read it without having heard an update,
* we start to make the peer's value approach decayto, getting
* half of the way there each time. */
- if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
+ if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) {
printlog(LOG_WARN, "AWOL remote limiter detected.\n");
remote->rate += ((decayto - remote->rate) / 2);
} else {
}
*aggregate += comm->local_rate;
} else if (comm->comm_fabric == COMM_GOSSIP) {
+ int i;
+ int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
double value = 0;
value = (comm->gossip.value / comm->gossip.weight);
value *= (comm->remote_node_count + 1);
/* Keep around the last value so that we don't stupidly pick 0 when
* we're negative. If we pick 0, it looks to the limiter like it
* has free reign and it will take 100% of the rate allocation for
- * itself. */
+ * itself. This is a lie. Open question what to do here... FIXME: Use decayto?*/
if (value <= 0) {
//*aggregate = comm->gossip.last_nonzero;
*aggregate = 0;
- //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
+ printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n");
} else {
*aggregate = value;
comm->gossip.last_nonzero = *aggregate;
- //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
+ printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value);
+ }
+
+ for (i = 0; i < comm->remote_node_count; ++i) {
+ if (comm->remote_limiters[i].awol == threshold) {
+ /* Re-claim any value/weight sent. */
+ comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value;
+ comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight;
+
+ comm->remote_limiters[i].outgoing.saved_value = 0.0;
+ comm->remote_limiters[i].outgoing.saved_weight = 0.0;
+
+ comm->remote_limiters[i].awol += 1;
+ } else if (comm->remote_limiters[i].awol < threshold) {
+ comm->remote_limiters[i].awol += 1;
+ }
}
} else {
printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
}
pthread_mutex_unlock(&comm->lock);
- //printf("read: %.3f\n", *aggregate);
-
return 0;
}
#define MAX_IDENTS (1024)
#define MAX_LIMITERS (128)
-#define REMOTE_AWOL_THRESHOLD (5)
+#define MESH_REMOTE_AWOL_THRESHOLD (5)
+
+//FIXME: Make this more scientific?
+#define GOSSIP_REMOTE_AWOL_THRESHOLD (10 * comm->remote_node_count / comm->gossip.gossip_branch)
enum transports { UDP, TCP };
for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
if (ident->comm.retrys[i] >= 0 &&
remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
- //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]);
ident->comm.retrys[i] = -2;
}
}
ident->comm.gossip.value += msg.value;
ident->comm.gossip.weight += msg.weight;
send_ack(ident, remote, msg.seqno);
+ remote->awol = 0;
} else if (msg.seqno > remote->incoming.seen_seqno) {
/* Only some of the message is old news. */
double diff_value = msg.value - remote->incoming.saved_value;
ident->comm.gossip.value += diff_value;
ident->comm.gossip.weight += diff_weight;
send_ack(ident, remote, msg.seqno);
+ remote->awol = 0;
} else {
/* The entire message is old news. (Duplicate). */
/* Do nothing. */
int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
int i, j, targetid;
+ int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
+ int rand_count; //HACK...
int result = 0;
remote_limiter_t *remote;
struct sockaddr_in toaddr;
double msg_value, msg_weight;
+ /* This is the factor for the portion of value/weight to keep locally.
+ * Normally this is 1, meaning that we retain the same amount of value/weight
+ * that was sent to the peers. In the case of not being able to send to a
+ * peer though, we increment this to reclaim the value/weight locally. */
+ int message_portion = 1;
+
memset(&toaddr, 0, sizeof(struct sockaddr_in));
toaddr.sin_family = AF_INET;
if (comm->retrys[i] >= 0) {
remote = &comm->remote_limiters[comm->retrys[i]];
targetid = comm->retrys[i];
- //printf("%d:d:%d, ", i, comm->retrys[i]);
+
+ if (remote->awol > awol_threshold) {
+ message_portion += 1;
+ printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]);
+ comm->retrys[i] = -1;
+ continue;
+ }
} else {
targetid = -2;
+ rand_count = 0;
- while (targetid == -2) {
+ while (targetid == -2 && rand_count < 50) {
targetid = myrand() % comm->remote_node_count;
+ rand_count += 1;
+ /* Don't select an already-used index. */
for (j = 0; j < comm->gossip.gossip_branch; ++j) {
- if (targetid == comm->retrys[j]) {
+ if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) {
+ printlog(LOG_DEBUG, "Gossip: disqualified targetid %d. retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol);
targetid = -2;
break;
}
}
}
+ if (targetid < 0) {
+ /* Couldn't find a suitable peer to send to... */
+ message_portion += 1;
+ printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n");
+ continue;
+ } else {
+ printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid);
+ }
+
remote = &comm->remote_limiters[targetid];
- //printf("%d:r:%d, ", i, targetid);
}
+
+ comm->retrys[i] = targetid;
toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
toaddr.sin_port = remote->port;
remote->outgoing.saved_value += msg_value;
remote->outgoing.saved_weight += msg_weight;
+#ifdef ALLOW_PARTITION
+
+ if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+ printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid);
+ continue;
+ }
+
+#endif
+
message_to_nbo(&msg);
if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
result = errno;
break;
}
-
- comm->retrys[i] = targetid;
}
- //printf("\n");
- comm->gossip.value = msg_value;
- comm->gossip.weight = msg_weight;
+ comm->gossip.value = msg_value * message_portion;
+ comm->gossip.weight = msg_weight * message_portion;
return result;
}
/* Allocate the backing and give it a little bit extra to deal with variance. */
table->largest = NULL;
- table->backing = malloc(sizeof(sampled_flow) * table->capacity * 1.05);
+ table->backing = malloc(sizeof(sampled_flow) * table->capacity * SAMPLEHOLD_BONUS_FACTOR);
if (table->backing == NULL) {
free(table);
return NULL;
}
- memset(table->backing, 0, sizeof(sampled_flow) * table->capacity);
+ memset(table->backing, 0, sizeof(sampled_flow) * table->capacity * SAMPLEHOLD_BONUS_FACTOR);
srand(time(NULL));
struct in_addr src, dst;
char sip[22], dip[22];
+ /* Reset statistics. */
+ table->common->num_flows = 0;
+ table->common->num_flows_5k = 0;
+ table->common->num_flows_10k = 0;
+ table->common->num_flows_20k = 0;
+ table->common->num_flows_50k = 0;
+ table->common->avg_rate = 0;
+ /* End statistics. */
+
/* Update common aggregate information. */
time_delta = timeval_subtract(now, table->common->last_update);
table->largest = &table->backing[i];
}
+ if (table->backing[i].rate > 51200) {
+ table->common->num_flows_50k += 1;
+ table->common->num_flows_20k += 1;
+ table->common->num_flows_10k += 1;
+ table->common->num_flows_5k += 1;
+ } else if (table->backing[i].rate > 20480) {
+ table->common->num_flows_20k += 1;
+ table->common->num_flows_10k += 1;
+ table->common->num_flows_5k += 1;
+ } else if (table->backing[i].rate > 10240) {
+ table->common->num_flows_10k += 1;
+ table->common->num_flows_5k += 1;
+ } else if (table->backing[i].rate > 5120) {
+ table->common->num_flows_5k += 1;
+ }
+
table->common->num_flows += 1;
/* Print debugging info. */
#define RANDOM_GRANULARITY (1000)
-#define SAMPLEHOLD_PERCENTAGE (5)
+// FIXME: In reality, you probably don't want this higher than 5
+//#define SAMPLEHOLD_PERCENTAGE (5)
+#define SAMPLEHOLD_PERCENTAGE (10)
#define SAMPLEHOLD_OVERFACTOR (10)
+#define SAMPLEHOLD_BONUS_FACTOR (1.05)
/** In-table representation of a flow that has been sampled. */
typedef struct sampled_flow {
table->common->num_flows_20k += 1;
table->common->num_flows_10k += 1;
table->common->num_flows_5k += 1;
- table->common->num_flows += 1;
} else if (current->rate > 20480) {
table->common->num_flows_20k += 1;
table->common->num_flows_10k += 1;
table->common->num_flows_5k += 1;
- table->common->num_flows += 1;
} else if (current->rate > 10240) {
table->common->num_flows_10k += 1;
table->common->num_flows_5k += 1;
- table->common->num_flows += 1;
} else if (current->rate > 5120) {
table->common->num_flows_5k += 1;
- table->common->num_flows += 1;
- } else {
- table->common->num_flows += 1;
- }
+ }
+
+ table->common->num_flows += 1;
src.s_addr = ntohl(current->source_ip);
dst.s_addr = ntohl(current->dest_ip);