Long-outstanding commit. (Hopefully) Final version before running experiments for...
[distributedratelimiting.git] / drl / peer_comm.c
index cb30657..eba9637 100644 (file)
 #include "peer_comm.h"
 #include "logging.h"
 
+/* Artifically makes a network partition. */
+int do_partition = 0;
+int partition_set = 0xfffffff;
+
 extern limiter_t limiter;
 
 static const uint32_t MAGIC_MSG = 0x123123;
@@ -191,7 +195,6 @@ void limiter_receive() {
                     for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
                         if (ident->comm.retrys[i] >= 0 &&
                             remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
-                                //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]);
                                 ident->comm.retrys[i] = -2;
                         }
                     }
@@ -206,6 +209,7 @@ void limiter_receive() {
                     ident->comm.gossip.value += msg.value;
                     ident->comm.gossip.weight += msg.weight;
                     send_ack(ident, remote, msg.seqno);
+                    remote->awol = 0;
                 } else if (msg.seqno > remote->incoming.seen_seqno) {
                     /* Only some of the message is old news. */
                     double diff_value = msg.value - remote->incoming.saved_value;
@@ -218,6 +222,7 @@ void limiter_receive() {
                     ident->comm.gossip.value += diff_value;
                     ident->comm.gossip.weight += diff_weight;
                     send_ack(ident, remote, msg.seqno);
+                    remote->awol = 0;
                 } else {
                     /* The entire message is old news. (Duplicate). */
                     /* Do nothing. */
@@ -386,11 +391,24 @@ static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock)
 }
 #endif
 
+/* Turn this on to simulate network partitions.
+ * Turn off for production settings. */
+//#define ALLOW_PARTITION
+
 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     int result = 0;
     remote_limiter_t *remote;
     message_t msg;
     struct sockaddr_in toaddr;
+    int i;
+
+#ifdef ALLOW_PARTITION
+
+    int partition_count = 0;
+    struct in_addr dest;
+    char dest_ip[22];
+
+#endif
 
     memset(&toaddr, 0, sizeof(struct sockaddr_in));
     toaddr.sin_family = AF_INET;
@@ -404,14 +422,36 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     message_to_nbo(&msg);
 
     /* Iterate though and send update to all remote limiters in our identity. */
-    map_reset_iterate(comm->remote_node_map);
-    while ((remote = map_next(comm->remote_node_map))) {
+    for (i = 0; i < comm->remote_node_count; ++i) {
+        remote = &comm->remote_limiters[i];
+
+#ifdef ALLOW_PARTITION
+
+        if (do_partition) {
+            printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n",
+                     partition_set, partition_count, partition_set & (1 << partition_count));
+            /* If the partition count bit isn't high in the set, don't actually send anything. */
+            if ((partition_set & (1 << partition_count)) == 0) {
+                dest.s_addr = ntohl(remote->addr);
+                strcpy(dest_ip, inet_ntoa(dest));
+
+                printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip);
+
+                partition_count += 1;
+                continue;
+            }
+        }
+
+        partition_count += 1;
+
+#endif
+
         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
         toaddr.sin_port = remote->port;
         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
-            printlog(LOG_CRITICAL, "ERR: limiter_send_mesh: sento failed.\n");
+            printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
             result = errno;
-            printlog(LOG_CRITICAL, "  - The error was |%d|\n", strerror(result));
+            printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
             break;
         }
     }
@@ -421,11 +461,19 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
 
 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
     int i, j, targetid;
+    int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
+    int rand_count; //HACK...
     int result = 0;
     remote_limiter_t *remote;
     struct sockaddr_in toaddr;
     double msg_value, msg_weight;
 
+    /* This is the factor for the portion of value/weight to keep locally.
+     * Normally this is 1, meaning that we retain the same amount of value/weight
+     * that was sent to the peers.  In the case of not being able to send to a
+     * peer though, we increment this to reclaim the value/weight locally. */
+    int message_portion = 1;
+    
     memset(&toaddr, 0, sizeof(struct sockaddr_in));
     toaddr.sin_family = AF_INET;
 
@@ -435,27 +483,49 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
     for (i = 0; i < comm->gossip.gossip_branch; ++i) {
         message_t msg;
 
+        printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
+
         if (comm->retrys[i] >= 0) {
             remote = &comm->remote_limiters[comm->retrys[i]];
             targetid = comm->retrys[i];
-            //printf("%d:d:%d, ", i, comm->retrys[i]);
+
+            if (remote->awol > awol_threshold) {
+                message_portion += 1;
+                printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]);
+                comm->retrys[i] = -1;
+                continue;
+            }
         } else {
             targetid = -2;
+            rand_count = 0;
 
-            while (targetid == -2) {
+            while (targetid == -2 && rand_count < 50) {
                 targetid = myrand() % comm->remote_node_count;
+                rand_count += 1;
 
+                /* Don't select an already-used index. */
                 for (j = 0; j < comm->gossip.gossip_branch; ++j) {
-                    if (targetid == comm->retrys[j]) {
+                    if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) {
+                        printlog(LOG_DEBUG, "Gossip: disqualified targetid %d.  retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol);
                         targetid = -2;
                         break;
                     }
                 }
             }
 
+            if (targetid < 0) {
+                /* Couldn't find a suitable peer to send to... */
+                message_portion += 1;
+                printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n");
+                continue;
+            } else {
+                printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid);
+            }
+
             remote = &comm->remote_limiters[targetid];
-            //printf("%d:r:%d, ", i, targetid);
         }
+        
+        comm->retrys[i] = targetid;
 
         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
         toaddr.sin_port = remote->port;
@@ -473,20 +543,26 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
         remote->outgoing.saved_value += msg_value;
         remote->outgoing.saved_weight += msg_weight;
 
+#ifdef ALLOW_PARTITION
+
+        if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+            printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid);
+            continue;
+        }
+
+#endif
+
         message_to_nbo(&msg);
 
         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
-            printlog(LOG_CRITICAL, "ERR: limiter_send_gossip: sento failed.\n");
+            printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
             result = errno;
             break;
         }
-
-        comm->retrys[i] = targetid;
     }
-    //printf("\n");
 
-    comm->gossip.value = msg_value;
-    comm->gossip.weight = msg_weight;
+    comm->gossip.value = msg_value * message_portion;
+    comm->gossip.weight = msg_weight * message_portion;
 
     return result;
 }