Fixes & tweaks for sample and hold accounting.
authorKevin Webb <kcwebb@ucsd.edu>
Sun, 12 Apr 2009 22:57:56 +0000 (22:57 +0000)
committerKevin Webb <kcwebb@ucsd.edu>
Sun, 12 Apr 2009 22:57:56 +0000 (22:57 +0000)
Attempting to make gossip detect and handle unreachable nodes.  Not stable yet...

drl/drl_state.c
drl/drl_state.h
drl/peer_comm.c
drl/samplehold.c
drl/samplehold.h
drl/standard.c

index ea68f51..ef57abe 100644 (file)
@@ -133,7 +133,7 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
             /* If we continue to read it without having heard an update,
              * we start to make the peer's value approach decayto, getting
              * half of the way there each time. */
-            if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
+            if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) {
                 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
                 remote->rate += ((decayto - remote->rate) / 2);
             } else {
@@ -142,6 +142,8 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
         }
         *aggregate += comm->local_rate;
     } else if (comm->comm_fabric == COMM_GOSSIP) {
+        int i;
+        int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
         double value = 0;
         value = (comm->gossip.value / comm->gossip.weight);
         value *= (comm->remote_node_count + 1);
@@ -149,15 +151,30 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
         /* Keep around the last value so that we don't stupidly pick 0 when
          * we're negative.  If we pick 0, it looks to the limiter like it
          * has free reign and it will take 100% of the rate allocation for
-         * itself. */
+         * itself. This is a lie.  Open question what to do here... FIXME: Use decayto?*/
         if (value <= 0) {
             //*aggregate = comm->gossip.last_nonzero;
             *aggregate = 0;
-            //printf("*****Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
+            printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n");
         } else {
             *aggregate = value;
             comm->gossip.last_nonzero = *aggregate;
-            //printf("Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
+            printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value);
+        }
+
+        for (i = 0; i < comm->remote_node_count; ++i) {
+            if (comm->remote_limiters[i].awol == threshold) {
+                /* Re-claim any value/weight sent. */
+                comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value;
+                comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight;
+
+                comm->remote_limiters[i].outgoing.saved_value = 0.0;
+                comm->remote_limiters[i].outgoing.saved_weight = 0.0;
+
+                comm->remote_limiters[i].awol += 1;
+            } else if (comm->remote_limiters[i].awol < threshold) {
+                comm->remote_limiters[i].awol += 1;
+            }
         }
     } else {
         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
@@ -167,8 +184,6 @@ int read_comm(comm_t *comm, double *aggregate, double decayto) {
     }
     pthread_mutex_unlock(&comm->lock);
 
-    //printf("read: %.3f\n", *aggregate);
-
     return 0;
 }
 
index 7715a36..950ea7e 100644 (file)
 #define MAX_IDENTS (1024)
 #define MAX_LIMITERS (128)
 
-#define REMOTE_AWOL_THRESHOLD (5)
+#define MESH_REMOTE_AWOL_THRESHOLD (5)
+
+//FIXME: Make this more scientific?
+#define GOSSIP_REMOTE_AWOL_THRESHOLD (10 * comm->remote_node_count / comm->gossip.gossip_branch)
 
 enum transports { UDP, TCP };
 
index 41ed778..2da6737 100644 (file)
@@ -195,7 +195,6 @@ void limiter_receive() {
                     for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
                         if (ident->comm.retrys[i] >= 0 &&
                             remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
-                                //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]);
                                 ident->comm.retrys[i] = -2;
                         }
                     }
@@ -210,6 +209,7 @@ void limiter_receive() {
                     ident->comm.gossip.value += msg.value;
                     ident->comm.gossip.weight += msg.weight;
                     send_ack(ident, remote, msg.seqno);
+                    remote->awol = 0;
                 } else if (msg.seqno > remote->incoming.seen_seqno) {
                     /* Only some of the message is old news. */
                     double diff_value = msg.value - remote->incoming.saved_value;
@@ -222,6 +222,7 @@ void limiter_receive() {
                     ident->comm.gossip.value += diff_value;
                     ident->comm.gossip.weight += diff_weight;
                     send_ack(ident, remote, msg.seqno);
+                    remote->awol = 0;
                 } else {
                     /* The entire message is old news. (Duplicate). */
                     /* Do nothing. */
@@ -457,11 +458,19 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
 
 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
     int i, j, targetid;
+    int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
+    int rand_count; //HACK...
     int result = 0;
     remote_limiter_t *remote;
     struct sockaddr_in toaddr;
     double msg_value, msg_weight;
 
+    /* This is the factor for the portion of value/weight to keep locally.
+     * Normally this is 1, meaning that we retain the same amount of value/weight
+     * that was sent to the peers.  In the case of not being able to send to a
+     * peer though, we increment this to reclaim the value/weight locally. */
+    int message_portion = 1;
+    
     memset(&toaddr, 0, sizeof(struct sockaddr_in));
     toaddr.sin_family = AF_INET;
 
@@ -476,24 +485,44 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
         if (comm->retrys[i] >= 0) {
             remote = &comm->remote_limiters[comm->retrys[i]];
             targetid = comm->retrys[i];
-            //printf("%d:d:%d, ", i, comm->retrys[i]);
+
+            if (remote->awol > awol_threshold) {
+                message_portion += 1;
+                printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]);
+                comm->retrys[i] = -1;
+                continue;
+            }
         } else {
             targetid = -2;
+            rand_count = 0;
 
-            while (targetid == -2) {
+            while (targetid == -2 && rand_count < 50) {
                 targetid = myrand() % comm->remote_node_count;
+                rand_count += 1;
 
+                /* Don't select an already-used index. */
                 for (j = 0; j < comm->gossip.gossip_branch; ++j) {
-                    if (targetid == comm->retrys[j]) {
+                    if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) {
+                        printlog(LOG_DEBUG, "Gossip: disqualified targetid %d.  retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol);
                         targetid = -2;
                         break;
                     }
                 }
             }
 
+            if (targetid < 0) {
+                /* Couldn't find a suitable peer to send to... */
+                message_portion += 1;
+                printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n");
+                continue;
+            } else {
+                printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid);
+            }
+
             remote = &comm->remote_limiters[targetid];
-            //printf("%d:r:%d, ", i, targetid);
         }
+        
+        comm->retrys[i] = targetid;
 
         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
         toaddr.sin_port = remote->port;
@@ -511,6 +540,15 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
         remote->outgoing.saved_value += msg_value;
         remote->outgoing.saved_weight += msg_weight;
 
+#ifdef ALLOW_PARTITION
+
+        if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+            printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid);
+            continue;
+        }
+
+#endif
+
         message_to_nbo(&msg);
 
         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
@@ -518,13 +556,10 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
             result = errno;
             break;
         }
-
-        comm->retrys[i] = targetid;
     }
-    //printf("\n");
 
-    comm->gossip.value = msg_value;
-    comm->gossip.weight = msg_weight;
+    comm->gossip.value = msg_value * message_portion;
+    comm->gossip.weight = msg_weight * message_portion;
 
     return result;
 }
index 8c566d4..f98e5df 100644 (file)
@@ -86,14 +86,14 @@ sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow
 
     /* Allocate the backing and give it a little bit extra to deal with variance. */
     table->largest = NULL;
-    table->backing = malloc(sizeof(sampled_flow) * table->capacity * 1.05);
+    table->backing = malloc(sizeof(sampled_flow) * table->capacity * SAMPLEHOLD_BONUS_FACTOR);
 
     if (table->backing == NULL) {
         free(table);
         return NULL;
     }
 
-    memset(table->backing, 0, sizeof(sampled_flow) * table->capacity);
+    memset(table->backing, 0, sizeof(sampled_flow) * table->capacity * SAMPLEHOLD_BONUS_FACTOR);
 
     srand(time(NULL));
 
@@ -279,6 +279,15 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do
     struct in_addr src, dst;
     char sip[22], dip[22];
 
+    /* Reset statistics. */
+    table->common->num_flows = 0;
+    table->common->num_flows_5k = 0;
+    table->common->num_flows_10k = 0;
+    table->common->num_flows_20k = 0;
+    table->common->num_flows_50k = 0;
+    table->common->avg_rate = 0;
+    /* End statistics. */
+
     /* Update common aggregate information. */
     time_delta = timeval_subtract(now, table->common->last_update);
 
@@ -340,6 +349,22 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do
                 table->largest = &table->backing[i];
             }
 
+            if (table->backing[i].rate > 51200) {
+                table->common->num_flows_50k += 1;
+                table->common->num_flows_20k += 1;
+                table->common->num_flows_10k += 1;
+                table->common->num_flows_5k += 1;
+            } else if (table->backing[i].rate > 20480) {
+                table->common->num_flows_20k += 1;
+                table->common->num_flows_10k += 1;
+                table->common->num_flows_5k += 1;
+            } else if (table->backing[i].rate > 10240) {
+                table->common->num_flows_10k += 1;
+                table->common->num_flows_5k += 1;
+            } else if (table->backing[i].rate > 5120) {
+                table->common->num_flows_5k += 1;
+            }
+
             table->common->num_flows += 1;
 
             /* Print debugging info. */
index f96fa26..6c992fd 100644 (file)
 
 #define RANDOM_GRANULARITY (1000)
 
-#define SAMPLEHOLD_PERCENTAGE (5)
+// FIXME: In reality, you probably don't want this higher than 5
+//#define SAMPLEHOLD_PERCENTAGE (5)
+#define SAMPLEHOLD_PERCENTAGE (10)
 #define SAMPLEHOLD_OVERFACTOR (10)
+#define SAMPLEHOLD_BONUS_FACTOR (1.05)
 
 /** In-table representation of a flow that has been sampled. */
 typedef struct sampled_flow {
index 9c996e3..103a23c 100644 (file)
@@ -314,22 +314,18 @@ void standard_table_update_flows(standard_flow_table table, struct timeval now,
             table->common->num_flows_20k += 1;
             table->common->num_flows_10k += 1;
             table->common->num_flows_5k += 1;
-            table->common->num_flows += 1;
         } else if (current->rate > 20480) {
             table->common->num_flows_20k += 1;
             table->common->num_flows_10k += 1;
             table->common->num_flows_5k += 1;
-            table->common->num_flows += 1;
         } else if (current->rate > 10240) {
             table->common->num_flows_10k += 1;
             table->common->num_flows_5k += 1;
-            table->common->num_flows += 1;
         } else if (current->rate > 5120) {
             table->common->num_flows_5k += 1;
-            table->common->num_flows += 1;
-        } else {
-            table->common->num_flows += 1;
-        }
+        } 
+
+        table->common->num_flows += 1;
 
         src.s_addr = ntohl(current->source_ip);
         dst.s_addr = ntohl(current->dest_ip);