Updates to autotools for library detection
[distributedratelimiting.git] / drl / peer_comm.c
index cb30657..fc2dc86 100644 (file)
 #include "peer_comm.h"
 #include "logging.h"
 
-extern limiter_t limiter;
+#define NULL_PEER (-2)
+#define MESH_REMOTE_AWOL_THRESHOLD (5)
+#define GOSSIP_REMOTE_AWOL_THRESHOLD (5)
+
+/* From ulogd_DRL.c */
+extern int do_partition;
+extern int partition_set;
 
-static const uint32_t MAGIC_MSG = 0x123123;
-static const uint32_t MAGIC_HELLO = 0x456456;
-static const uint16_t MSG = 1;
-static const uint16_t ACK = 2;
+extern limiter_t limiter;
 
-static void message_to_hbo(message_t *msg) {
+void message_to_hbo(message_t *msg) {
     msg->magic = ntohl(msg->magic);
     msg->ident_id = ntohl(msg->ident_id);
+    /* value is a double */
+    /* weight is a double */
     msg->seqno = ntohl(msg->seqno);
     msg->min_seqno = ntohl(msg->min_seqno);
     msg->type = ntohs(msg->type);
-    /* value is a double */
-    /* weight is a double */
+    /* ping_source, ping_port, check_target, and check_port stay in nbo. */
+    msg->checkack_value = ntohl(msg->checkack_value);
+    msg->update_present = ntohl(msg->update_present);
+    msg->reachability = ntohl(msg->reachability);
+    msg->incarnation = ntohl(msg->incarnation);
+    /* node has two fields, both stay in nbo. */
+    msg->view = ntohl(msg->view);
 }
 
-static void message_to_nbo(message_t *msg) {
+void message_to_nbo(message_t *msg) {
     msg->magic = htonl(msg->magic);
     msg->ident_id = htonl(msg->ident_id);
+    /* value is a double */
+    /* weight is a double */
     msg->seqno = htonl(msg->seqno);
     msg->min_seqno = htonl(msg->min_seqno);
     msg->type = htons(msg->type);
-    /* value is a double */
-    /* weight is a double */
-}
-
-static void hello_to_hbo(hello_t *hello) {
-    hello->magic = ntohl(hello->magic);
-    hello->ident_id = ntohl(hello->ident_id);
-    hello->port = ntohs(hello->port);
-}
-
-static void hello_to_nbo(hello_t *hello) {
-    hello->magic = htonl(hello->magic);
-    hello->ident_id = htonl(hello->ident_id);
-    hello->port = htons(hello->port);
-}
-
-static int is_connected(remote_limiter_t *remote) {
-    struct sockaddr_in addr;
-    socklen_t addrlen = sizeof(addr);
-
-    if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
-        return 1;
-    else
-        return 0;
+    /* ping_source, ping_port, check_target, and check_port already in nbo. */
+    msg->checkack_value = htonl(msg->checkack_value);
+    msg->update_present = htonl(msg->update_present);
+    msg->reachability = htonl(msg->reachability);
+    msg->incarnation = htonl(msg->incarnation);
+    /* node has two fields, both already in nbo. */
+    msg->view = htonl(msg->view);
 }
 
-static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
+int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view) {
     int result = 0;
     message_t msg;
     struct sockaddr_in toaddr;
@@ -104,9 +99,10 @@ static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno)
 
     memset(&msg, 0, sizeof(msg));
     msg.magic = MAGIC_MSG;
-    msg.ident_id = ident->id;
-    msg.type = ACK;
+    msg.ident_id = id;
+    msg.type = type;
     msg.seqno = seqno;
+    msg.view = view;
 
     message_to_nbo(&msg);
 
@@ -167,230 +163,40 @@ void limiter_receive() {
         return;
     }
 
-    switch (ident->comm.comm_fabric) {
-        case COMM_MESH: {
-            /* Use the message's value to be our new GRDrate/FPSweight for the
-             * message's sender. */
-            remote->rate = msg.value;
-
-            /* Reset the AWOL counter to zero since we received an update. */
-            remote->awol = 0;
-        }
-        break;
-
-        case COMM_GOSSIP: {
-            if (msg.type == ACK) {
-                if (msg.seqno == remote->outgoing.next_seqno - 1) {
-                    int i;
-
-                    /* Ack for most recent message.  Clear saved state. */
-                    remote->outgoing.first_seqno = remote->outgoing.next_seqno;
-                    remote->outgoing.saved_value = 0;
-                    remote->outgoing.saved_weight = 0;
-
-                    for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
-                        if (ident->comm.retrys[i] >= 0 &&
-                            remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
-                                //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]);
-                                ident->comm.retrys[i] = -2;
-                        }
-                    }
-                }
-                /* Ignore ack if it isn't for most recent message. */
-            } else {
-                if (msg.min_seqno > remote->incoming.seen_seqno) {
-                    /* Entirely new information */
-                    remote->incoming.seen_seqno = msg.seqno;
-                    remote->incoming.saved_value = msg.value;
-                    remote->incoming.saved_weight = msg.weight;
-                    ident->comm.gossip.value += msg.value;
-                    ident->comm.gossip.weight += msg.weight;
-                    send_ack(ident, remote, msg.seqno);
-                } else if (msg.seqno > remote->incoming.seen_seqno) {
-                    /* Only some of the message is old news. */
-                    double diff_value = msg.value - remote->incoming.saved_value;
-                    double diff_weight = msg.weight - remote->incoming.saved_weight;
-
-                    remote->incoming.seen_seqno = msg.seqno;
-                    remote->incoming.saved_value = msg.value;
-                    remote->incoming.saved_weight = msg.weight;
-
-                    ident->comm.gossip.value += diff_value;
-                    ident->comm.gossip.weight += diff_weight;
-                    send_ack(ident, remote, msg.seqno);
-                } else {
-                    /* The entire message is old news. (Duplicate). */
-                    /* Do nothing. */
-                }
-            }
-        }
-        break;
-
-        default: {
-            printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
-        }
-    }
-
+    /* Pass the message to the comm's recv function, which is responsible for
+     * processing its contents. */
+    ident->comm.recv_function(&ident->comm, ident->id, limiter.udp_socket, remote, &msg);
+    
     pthread_mutex_unlock(&ident->comm.lock);
     pthread_rwlock_unlock(&limiter.limiter_lock);
 }
 
-#if 0
-static void limiter_accept(comm_limiter_t *limiter) {
-    int sock, result;
-    struct sockaddr_in fromaddr;
-    socklen_t fromlen = sizeof(fromaddr);
-    remote_node_t sender;
-    remote_limiter_t *remote;
-    hello_t hello;
-    comm_ident_t *ident;
-    ident_handle *handle = NULL;
-
-    sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
-
-    assert(sock > 0);
+int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+    /* Use the message's value to be our new GRDrate/FPSweight for the
+     * message's sender. */
+    remote->rate = msg->value;
 
-    memset(&hello, 0, sizeof(hello_t));
-    result = recv(sock, &hello, sizeof(hello_t), 0);
-
-    if (result < 0) {
-        close(sock);
-        return; /* Failure - ignore it. */
-    }
-
-    assert(result == sizeof(hello_t));
-
-    hello_to_hbo(&hello);
-
-    assert(hello.magic == MAGIC_HELLO);
-
-    memset(&sender, 0, sizeof(remote_node_t));
-    sender.addr = fromaddr.sin_addr.s_addr;
-    sender.port = ntohs(hello.port);
+    /* Reset the AWOL counter to zero since we received an update. */
+    remote->awol = 0;
+    remote->reachability = REACHABLE;
 
-    pthread_testcancel();
-
-    pthread_rwlock_rdlock(&limiter->rwlock);
-
-    handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
-
-    if (handle == NULL) {
-        printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
-        pthread_rwlock_unlock(&limiter->rwlock);
-        return;
-    }
-
-    ident = limiter->identities[*handle];
-    assert(ident != NULL);
-
-    pthread_mutex_lock(&ident->lock);
-
-    remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
-
-    if (remote == NULL) {
-        printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
-        pthread_mutex_unlock(&ident->lock);
-        pthread_rwlock_unlock(&limiter->rwlock);
-        close(sock);
-        return;
-    }
-
-    if (is_connected(remote)) {
-        /* We are still connected, don't need the new socket. */
-        close(sock);
-        pthread_mutex_unlock(&ident->lock);
-        pthread_rwlock_unlock(&limiter->rwlock);
-        return;
-    }
-
-    /* We weren't connected, but we are now... */
-    remote->socket = sock;
-    printf("Got connection on: %d\n", sock);
-    FD_SET(sock, &ident->fds);
-
-    pthread_mutex_unlock(&ident->lock);
-    pthread_rwlock_unlock(&limiter->rwlock);
-}
-
-static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
-    int result;
-    message_t msg;
-
-    memset(&msg, 0, sizeof(message_t));
-
-    result = recv(sock, &msg, sizeof(message_t), 0);
-
-    if (result < 0) {
-        pthread_rwlock_rdlock(limiter_rwlock);
-        pthread_mutex_lock(&ident->lock);
-        FD_CLR(sock, &ident->fds);
-        close(sock);
-        pthread_mutex_unlock(&ident->lock);
-        pthread_rwlock_unlock(limiter_rwlock);
-        return;
-    }
-
-    assert(result == sizeof(message_t));
-
-    message_to_hbo(&msg);
-    assert(msg.magic == MAGIC_MSG);
-
-    pthread_rwlock_rdlock(limiter_rwlock);
-    pthread_mutex_lock(&ident->lock);
-
-    switch (ident->comm_fabric) {
-        case COMM_GOSSIP: {
-            ident->gossip.value += msg.value;
-            ident->gossip.weight += msg.weight;
-        }
-        break;
-
-        default: {
-            assert(1 == 0); /* This case shouldn't happen. Punt for now... */
-        }
-    }
-    pthread_mutex_unlock(&ident->lock);
-    pthread_rwlock_unlock(limiter_rwlock);
+    return 0;
 }
 
-static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
-    int select_result, i;
-    fd_set fds_copy;
-    struct timeval timeout;
-
-    FD_ZERO(&fds_copy);
-    timeout.tv_sec = 15;
-    timeout.tv_usec = 0;
-
-    pthread_rwlock_rdlock(limiter_rwlock);
-    pthread_mutex_lock(&ident->lock);
-    memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
-    pthread_mutex_unlock(&ident->lock);
-    pthread_rwlock_unlock(limiter_rwlock);
-    
-    /* mask interrupt signals for this thread? */
-
-    select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
-
-    assert(select_result >= 0);
-    
-    if (select_result == 0)
-        return; /* Timed out */
-
-    for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
-        if (FD_ISSET(i, &fds_copy)) {
-            read_tcp_message(ident, limiter_rwlock, i);
-            select_result--;
-        }
-    }
-}
-#endif
-
 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     int result = 0;
     remote_limiter_t *remote;
     message_t msg;
     struct sockaddr_in toaddr;
+    int i;
+
+#ifdef ALLOW_PARTITION
+
+    int partition_count = 0;
+    struct in_addr dest;
+    char dest_ip[22];
+
+#endif
 
     memset(&toaddr, 0, sizeof(struct sockaddr_in));
     toaddr.sin_family = AF_INET;
@@ -399,19 +205,50 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     msg.magic = MAGIC_MSG;
     msg.ident_id = id;
     msg.value = comm->local_rate;
+    msg.view = comm->gossip.view;
     /* Do we want seqnos for mesh?  We can get by without them. */
 
     message_to_nbo(&msg);
 
     /* Iterate though and send update to all remote limiters in our identity. */
-    map_reset_iterate(comm->remote_node_map);
-    while ((remote = map_next(comm->remote_node_map))) {
+    for (i = 0; i < comm->remote_node_count; ++i) {
+        remote = &comm->remote_limiters[i];
+
+        /* Increase this counter.  For mesh, it represents the number of messages we have sent to
+         * this remote limiter without having heard from it.  This is reset to 0 when we receive
+         * an update from this peer. */
+        remote->awol += 1;
+        if (remote->awol > MESH_REMOTE_AWOL_THRESHOLD) {
+            remote->reachability = UNREACHABLE;
+        }
+
+#ifdef ALLOW_PARTITION
+
+        if (do_partition) {
+            printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n",
+                     partition_set, partition_count, partition_set & (1 << partition_count));
+            /* If the partition count bit isn't high in the set, don't actually send anything. */
+            if ((partition_set & (1 << partition_count)) == 0) {
+                dest.s_addr = ntohl(remote->addr);
+                strcpy(dest_ip, inet_ntoa(dest));
+
+                printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip);
+
+                partition_count += 1;
+                continue;
+            }
+        }
+
+        partition_count += 1;
+
+#endif
+
         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
         toaddr.sin_port = remote->port;
         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
-            printlog(LOG_CRITICAL, "ERR: limiter_send_mesh: sento failed.\n");
+            printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
             result = errno;
-            printlog(LOG_CRITICAL, "  - The error was |%d|\n", strerror(result));
+            printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
             break;
         }
     }
@@ -419,13 +256,93 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     return result;
 }
 
+int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+    if (msg->type == ACK) {
+        /* If ACK was received then reset the awol count */
+        if (msg->seqno == remote->outgoing.next_seqno - 1) {
+            /* Ack for most recent message.  Clear saved state. */
+            remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+            remote->outgoing.saved_value = 0;
+            remote->outgoing.saved_weight = 0;
+
+            remote->awol = 0;
+        }
+        /* Ignore ack if it isn't for most recent message. */
+    } else if (msg->type == MSG) {
+        if (msg->min_seqno > remote->incoming.seen_seqno) {
+            /* Entirely new information */
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+            comm->gossip.value += msg->value;
+            comm->gossip.weight += msg->weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+        } 
+        else if (msg->seqno > remote->incoming.seen_seqno) {
+            /* Only some of the message is old news. */
+            double diff_value = msg->value - remote->incoming.saved_value;
+            double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+
+            comm->gossip.value += diff_value;
+            comm->gossip.weight += diff_weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+        } 
+        else {
+            /* The entire message is old news. (Duplicate). */
+            /* Do nothing. */
+        }
+    }
+
+    return 0;
+}
+
+int find_gossip_target(comm_t *comm) {
+    int target = NULL_PEER;
+    int k;
+
+    if (comm->shuffle_index < comm->remote_node_count) {
+        target = comm->indices[comm->shuffle_index];
+        printlog(LOG_DEBUG,"GOSSIP: found index %d.\n", target);
+        comm->shuffle_index++;
+    }
+    else {
+        // shuffle the remote_limiters array
+        printlog(LOG_DEBUG, "GOSSIP: shuffling the array.\n");
+        for ( k = 0; k < comm->remote_node_count; k++) {
+            uint32_t l = myrand() % comm->remote_node_count;
+            int t;
+            t = comm->indices[l];
+            comm->indices[l] = comm->indices[k];
+            comm->indices[k] = t;
+        }
+        comm->shuffle_index = 0;
+        target = comm->indices[comm->shuffle_index];
+        printlog(LOG_DEBUG,"GOSSIP: found index after spilling over %d.\n", target);
+        comm->shuffle_index++;
+    }
+    return target;
+}
+
 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
-    int i, j, targetid;
+    int i, j;
+    int retry_index = 0;
     int result = 0;
     remote_limiter_t *remote;
     struct sockaddr_in toaddr;
     double msg_value, msg_weight;
 
+    /* This is the factor for the portion of value/weight to keep locally.
+     * Normally this is 1, meaning that we retain the same amount of value/weight
+     * that was sent to the peers.  In the case of not being able to send to a
+     * peer though, we increment this to reclaim the value/weight locally. */
+    int message_portion = 1;
+
     memset(&toaddr, 0, sizeof(struct sockaddr_in));
     toaddr.sin_family = AF_INET;
 
@@ -433,30 +350,64 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
     msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
 
     for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+        int targetid = NULL_PEER;
+        int rand_count = 0;
         message_t msg;
 
-        if (comm->retrys[i] >= 0) {
-            remote = &comm->remote_limiters[comm->retrys[i]];
-            targetid = comm->retrys[i];
-            //printf("%d:d:%d, ", i, comm->retrys[i]);
-        } else {
-            targetid = -2;
+        printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
 
-            while (targetid == -2) {
-                targetid = myrand() % comm->remote_node_count;
+        /* If there are any peers with unacked messages, select them first. */
+        while (retry_index < comm->remote_node_count) {
+            if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
+                targetid = retry_index;
+                printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
+            }
+
+            retry_index += 1;
+        }
 
-                for (j = 0; j < comm->gossip.gossip_branch; ++j) {
-                    if (targetid == comm->retrys[j]) {
-                        targetid = -2;
-                        break;
-                    }
+        while (targetid == NULL_PEER && rand_count < 10) {
+            /* Select a recipient from a randomly-shuffled array. */
+            targetid = find_gossip_target(comm);
+
+            assert(targetid != NULL_PEER);
+
+            /* Don't select an already-used index. */
+            for (j = 0; j < i; ++j) {
+                if (targetid == comm->selected[j]) {
+                    printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  selected[j=%d] is %d\n", targetid, j, comm->selected[j]); 
+                    targetid = NULL_PEER;
+                    break;
+                }
+            }
+
+            /* Don't select an unreachable peer or one that is not in our view. */
+            if (targetid != NULL_PEER) {
+                if (comm->remote_limiters[targetid].reachability != REACHABLE ||
+                        comm->remote_limiters[targetid].view != comm->gossip.view ||
+                        comm->remote_limiters[targetid].view_confidence != IN) {
+                    printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d, reachability is %d, remote view is %d (confidence:%d), my view is %d\n",
+                            targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].view,
+                            comm->remote_limiters[targetid].view_confidence, comm->gossip.view);
+                    targetid = NULL_PEER;
                 }
             }
 
-            remote = &comm->remote_limiters[targetid];
-            //printf("%d:r:%d, ", i, targetid);
+            rand_count++;
         }
 
+        if (targetid == NULL_PEER) {
+            /* Couldn't find a suitable peer to send to... */
+            message_portion += 1;
+            printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
+            continue;
+        } else {
+            printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
+        }
+
+        remote = &comm->remote_limiters[targetid];
+        comm->selected[i] = targetid;
+
         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
         toaddr.sin_port = remote->port;
 
@@ -468,30 +419,63 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
         msg.seqno = remote->outgoing.next_seqno;
         msg.min_seqno = remote->outgoing.first_seqno;
         msg.type = MSG;
-
+        msg.view = comm->gossip.view;
+        
         remote->outgoing.next_seqno++;
         remote->outgoing.saved_value += msg_value;
         remote->outgoing.saved_weight += msg_weight;
+        /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
+        remote->awol += 1;
+
+
+#ifdef ALLOW_PARTITION
+
+        if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+            printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
+            continue;
+        }
+
+#endif
 
         message_to_nbo(&msg);
 
         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
-            printlog(LOG_CRITICAL, "ERR: limiter_send_gossip: sento failed.\n");
+            printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
             result = errno;
             break;
         }
-
-        comm->retrys[i] = targetid;
     }
-    //printf("\n");
-
-    comm->gossip.value = msg_value;
-    comm->gossip.weight = msg_weight;
+    comm->gossip.value = msg_value * message_portion;
+    comm->gossip.weight = msg_weight * message_portion;
 
     return result;
 }
 
+
+/* Old TCP code. */
 #if 0
+static void hello_to_hbo(hello_t *hello) {
+    hello->magic = ntohl(hello->magic);
+    hello->ident_id = ntohl(hello->ident_id);
+    hello->port = ntohs(hello->port);
+}
+
+static void hello_to_nbo(hello_t *hello) {
+    hello->magic = htonl(hello->magic);
+    hello->ident_id = htonl(hello->ident_id);
+    hello->port = htons(hello->port);
+}
+
+static int is_connected(remote_limiter_t *remote) {
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+
+    if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
+        return 1;
+    else
+        return 0;
+}
+
 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
     int i, targetid, sock;
     int result = 0;
@@ -525,9 +509,7 @@ int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
 
     return result;
 }
-#endif
 
-#if 0
 void *limiter_accept_thread(void *limiter) {
     sigset_t signal_mask;
 
@@ -610,4 +592,153 @@ void *ident_receive_thread(void *recv_args) {
     }
     pthread_exit(NULL);
 }
+
+static void limiter_accept(comm_limiter_t *limiter) {
+    int sock, result;
+    struct sockaddr_in fromaddr;
+    socklen_t fromlen = sizeof(fromaddr);
+    remote_node_t sender;
+    remote_limiter_t *remote;
+    hello_t hello;
+    comm_ident_t *ident;
+    dent_handle *handle = NULL;
+
+    sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
+
+    assert(sock > 0);
+
+    memset(&hello, 0, sizeof(hello_t));
+    result = recv(sock, &hello, sizeof(hello_t), 0);
+
+    if (result < 0) {
+        close(sock);
+        return; /* Failure - ignore it. */
+    }
+
+    assert(result == sizeof(hello_t));
+
+    hello_to_hbo(&hello);
+
+    assert(hello.magic == MAGIC_HELLO);
+
+    memset(&sender, 0, sizeof(remote_node_t));
+    sender.addr = fromaddr.sin_addr.s_addr;
+    sender.port = ntohs(hello.port);
+
+    pthread_testcancel();
+
+    pthread_rwlock_rdlock(&limiter->rwlock);
+
+    handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
+
+    if (handle == NULL) {
+        printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
+        pthread_rwlock_unlock(&limiter->rwlock);
+        return;
+    }
+
+    ident = limiter->identities[*handle];
+    assert(ident != NULL);
+
+    pthread_mutex_lock(&ident->lock);
+
+    remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
+
+    if (remote == NULL) {
+        printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(&limiter->rwlock);
+        close(sock);
+        return;
+    }
+
+    if (is_connected(remote)) {
+        /* We are still connected, don't need the new socket. */
+        close(sock);
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(&limiter->rwlock);
+        return;
+    }
+
+    /* We weren't connected, but we are now... */
+    remote->socket = sock;
+    printf("Got connection on: %d\n", sock);
+    FD_SET(sock, &ident->fds);
+
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(&limiter->rwlock);
+}
+
+static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
+    int result;
+    message_t msg;
+
+    memset(&msg, 0, sizeof(message_t));
+
+    result = recv(sock, &msg, sizeof(message_t), 0);
+
+    if (result < 0) {
+        pthread_rwlock_rdlock(limiter_rwlock);
+        pthread_mutex_lock(&ident->lock);
+        FD_CLR(sock, &ident->fds);
+        close(sock);
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(limiter_rwlock);
+        return;
+    }
+
+    assert(result == sizeof(message_t));
+
+    message_to_hbo(&msg);
+    assert(msg.magic == MAGIC_MSG);
+
+    pthread_rwlock_rdlock(limiter_rwlock);
+    pthread_mutex_lock(&ident->lock);
+
+    switch (ident->comm_fabric) {
+        case COMM_GOSSIP: {
+            ident->gossip.value += msg.value;
+            ident->gossip.weight += msg.weight;
+        }
+        break;
+
+        default: {
+            assert(1 == 0); /* This case shouldn't happen. Punt for now... */
+        }
+    }
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(limiter_rwlock);
+}
+
+static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
+    int select_result, i;
+    fd_set fds_copy;
+    struct timeval timeout;
+
+    FD_ZERO(&fds_copy);
+    timeout.tv_sec = 15;
+    timeout.tv_usec = 0;
+
+    pthread_rwlock_rdlock(limiter_rwlock);
+    pthread_mutex_lock(&ident->lock);
+    memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(limiter_rwlock);
+    
+    /* mask interrupt signals for this thread? */
+
+    select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
+
+    assert(select_result >= 0);
+    
+    if (select_result == 0)
+        return; /* Timed out */
+
+    for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
+        if (FD_ISSET(i, &fds_copy)) {
+            read_tcp_message(ident, limiter_rwlock, i);
+            select_result--;
+        }
+    }
+}
 #endif