#include "peer_comm.h"
#include "logging.h"
-/* Artifically makes a network partition. */
-int do_partition = 0;
-int partition_set = 0xfffffff;
+#define NULL_PEER (-2)
+#define MESH_REMOTE_AWOL_THRESHOLD (5)
+#define GOSSIP_REMOTE_AWOL_THRESHOLD (5)
-extern limiter_t limiter;
+/* From ulogd_DRL.c */
+extern int do_partition;
+extern int partition_set;
-static const uint32_t MAGIC_MSG = 0x123123;
-static const uint32_t MAGIC_HELLO = 0x456456;
-static const uint16_t MSG = 1;
-static const uint16_t ACK = 2;
+extern limiter_t limiter;
-static void message_to_hbo(message_t *msg) {
+void message_to_hbo(message_t *msg) {
msg->magic = ntohl(msg->magic);
msg->ident_id = ntohl(msg->ident_id);
+ /* value is a double */
+ /* weight is a double */
msg->seqno = ntohl(msg->seqno);
msg->min_seqno = ntohl(msg->min_seqno);
msg->type = ntohs(msg->type);
- /* value is a double */
- /* weight is a double */
+ /* ping_source, ping_port, check_target, and check_port stay in nbo. */
+ msg->checkack_value = ntohl(msg->checkack_value);
+ msg->update_present = ntohl(msg->update_present);
+ msg->reachability = ntohl(msg->reachability);
+ msg->incarnation = ntohl(msg->incarnation);
+ /* node has two fields, both stay in nbo. */
+ msg->view = ntohl(msg->view);
}
-static void message_to_nbo(message_t *msg) {
+void message_to_nbo(message_t *msg) {
msg->magic = htonl(msg->magic);
msg->ident_id = htonl(msg->ident_id);
+ /* value is a double */
+ /* weight is a double */
msg->seqno = htonl(msg->seqno);
msg->min_seqno = htonl(msg->min_seqno);
msg->type = htons(msg->type);
- /* value is a double */
- /* weight is a double */
-}
-
-static void hello_to_hbo(hello_t *hello) {
- hello->magic = ntohl(hello->magic);
- hello->ident_id = ntohl(hello->ident_id);
- hello->port = ntohs(hello->port);
-}
-
-static void hello_to_nbo(hello_t *hello) {
- hello->magic = htonl(hello->magic);
- hello->ident_id = htonl(hello->ident_id);
- hello->port = htons(hello->port);
-}
-
-static int is_connected(remote_limiter_t *remote) {
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
- return 1;
- else
- return 0;
+ /* ping_source, ping_port, check_target, and check_port already in nbo. */
+ msg->checkack_value = htonl(msg->checkack_value);
+ msg->update_present = htonl(msg->update_present);
+ msg->reachability = htonl(msg->reachability);
+ msg->incarnation = htonl(msg->incarnation);
+ /* node has two fields, both already in nbo. */
+ msg->view = htonl(msg->view);
}
-static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
+int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view) {
int result = 0;
message_t msg;
struct sockaddr_in toaddr;
memset(&msg, 0, sizeof(msg));
msg.magic = MAGIC_MSG;
- msg.ident_id = ident->id;
- msg.type = ACK;
+ msg.ident_id = id;
+ msg.type = type;
msg.seqno = seqno;
+ msg.view = view;
message_to_nbo(&msg);
return;
}
- switch (ident->comm.comm_fabric) {
- case COMM_MESH: {
- /* Use the message's value to be our new GRDrate/FPSweight for the
- * message's sender. */
- remote->rate = msg.value;
-
- /* Reset the AWOL counter to zero since we received an update. */
- remote->awol = 0;
- }
- break;
-
- case COMM_GOSSIP: {
- if (msg.type == ACK) {
- if (msg.seqno == remote->outgoing.next_seqno - 1) {
- int i;
-
- /* Ack for most recent message. Clear saved state. */
- remote->outgoing.first_seqno = remote->outgoing.next_seqno;
- remote->outgoing.saved_value = 0;
- remote->outgoing.saved_weight = 0;
-
- for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
- if (ident->comm.retrys[i] >= 0 &&
- remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
- //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]);
- ident->comm.retrys[i] = -2;
- }
- }
- }
- /* Ignore ack if it isn't for most recent message. */
- } else {
- if (msg.min_seqno > remote->incoming.seen_seqno) {
- /* Entirely new information */
- remote->incoming.seen_seqno = msg.seqno;
- remote->incoming.saved_value = msg.value;
- remote->incoming.saved_weight = msg.weight;
- ident->comm.gossip.value += msg.value;
- ident->comm.gossip.weight += msg.weight;
- send_ack(ident, remote, msg.seqno);
- } else if (msg.seqno > remote->incoming.seen_seqno) {
- /* Only some of the message is old news. */
- double diff_value = msg.value - remote->incoming.saved_value;
- double diff_weight = msg.weight - remote->incoming.saved_weight;
-
- remote->incoming.seen_seqno = msg.seqno;
- remote->incoming.saved_value = msg.value;
- remote->incoming.saved_weight = msg.weight;
-
- ident->comm.gossip.value += diff_value;
- ident->comm.gossip.weight += diff_weight;
- send_ack(ident, remote, msg.seqno);
- } else {
- /* The entire message is old news. (Duplicate). */
- /* Do nothing. */
- }
- }
- }
- break;
-
- default: {
- printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
- }
- }
-
+ /* Pass the message to the comm's recv function, which is responsible for
+ * processing its contents. */
+ ident->comm.recv_function(&ident->comm, ident->id, limiter.udp_socket, remote, &msg);
+
pthread_mutex_unlock(&ident->comm.lock);
pthread_rwlock_unlock(&limiter.limiter_lock);
}
-#if 0
-static void limiter_accept(comm_limiter_t *limiter) {
- int sock, result;
- struct sockaddr_in fromaddr;
- socklen_t fromlen = sizeof(fromaddr);
- remote_node_t sender;
- remote_limiter_t *remote;
- hello_t hello;
- comm_ident_t *ident;
- ident_handle *handle = NULL;
-
- sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
-
- assert(sock > 0);
-
- memset(&hello, 0, sizeof(hello_t));
- result = recv(sock, &hello, sizeof(hello_t), 0);
-
- if (result < 0) {
- close(sock);
- return; /* Failure - ignore it. */
- }
-
- assert(result == sizeof(hello_t));
-
- hello_to_hbo(&hello);
-
- assert(hello.magic == MAGIC_HELLO);
-
- memset(&sender, 0, sizeof(remote_node_t));
- sender.addr = fromaddr.sin_addr.s_addr;
- sender.port = ntohs(hello.port);
-
- pthread_testcancel();
-
- pthread_rwlock_rdlock(&limiter->rwlock);
-
- handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
-
- if (handle == NULL) {
- printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
- pthread_rwlock_unlock(&limiter->rwlock);
- return;
- }
+int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+ /* Use the message's value to be our new GRDrate/FPSweight for the
+ * message's sender. */
+ remote->rate = msg->value;
- ident = limiter->identities[*handle];
- assert(ident != NULL);
+ /* Reset the AWOL counter to zero since we received an update. */
+ remote->awol = 0;
+ remote->reachability = REACHABLE;
- pthread_mutex_lock(&ident->lock);
-
- remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
-
- if (remote == NULL) {
- printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(&limiter->rwlock);
- close(sock);
- return;
- }
-
- if (is_connected(remote)) {
- /* We are still connected, don't need the new socket. */
- close(sock);
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(&limiter->rwlock);
- return;
- }
-
- /* We weren't connected, but we are now... */
- remote->socket = sock;
- printf("Got connection on: %d\n", sock);
- FD_SET(sock, &ident->fds);
-
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(&limiter->rwlock);
+ return 0;
}
-static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
- int result;
- message_t msg;
-
- memset(&msg, 0, sizeof(message_t));
-
- result = recv(sock, &msg, sizeof(message_t), 0);
-
- if (result < 0) {
- pthread_rwlock_rdlock(limiter_rwlock);
- pthread_mutex_lock(&ident->lock);
- FD_CLR(sock, &ident->fds);
- close(sock);
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(limiter_rwlock);
- return;
- }
-
- assert(result == sizeof(message_t));
-
- message_to_hbo(&msg);
- assert(msg.magic == MAGIC_MSG);
-
- pthread_rwlock_rdlock(limiter_rwlock);
- pthread_mutex_lock(&ident->lock);
-
- switch (ident->comm_fabric) {
- case COMM_GOSSIP: {
- ident->gossip.value += msg.value;
- ident->gossip.weight += msg.weight;
- }
- break;
-
- default: {
- assert(1 == 0); /* This case shouldn't happen. Punt for now... */
- }
- }
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(limiter_rwlock);
-}
-
-static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
- int select_result, i;
- fd_set fds_copy;
- struct timeval timeout;
-
- FD_ZERO(&fds_copy);
- timeout.tv_sec = 15;
- timeout.tv_usec = 0;
-
- pthread_rwlock_rdlock(limiter_rwlock);
- pthread_mutex_lock(&ident->lock);
- memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(limiter_rwlock);
-
- /* mask interrupt signals for this thread? */
-
- select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
-
- assert(select_result >= 0);
-
- if (select_result == 0)
- return; /* Timed out */
-
- for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
- if (FD_ISSET(i, &fds_copy)) {
- read_tcp_message(ident, limiter_rwlock, i);
- select_result--;
- }
- }
-}
-#endif
-
-#define ALLOW_PARTITION
-
int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
int result = 0;
remote_limiter_t *remote;
msg.magic = MAGIC_MSG;
msg.ident_id = id;
msg.value = comm->local_rate;
+ msg.view = comm->gossip.view;
/* Do we want seqnos for mesh? We can get by without them. */
message_to_nbo(&msg);
for (i = 0; i < comm->remote_node_count; ++i) {
remote = &comm->remote_limiters[i];
+ /* Increase this counter. For mesh, it represents the number of messages we have sent to
+ * this remote limiter without having heard from it. This is reset to 0 when we receive
+ * an update from this peer. */
+ remote->awol += 1;
+ if (remote->awol > MESH_REMOTE_AWOL_THRESHOLD) {
+ remote->reachability = UNREACHABLE;
+ }
+
#ifdef ALLOW_PARTITION
if (do_partition) {
}
}
+ partition_count += 1;
+
#endif
toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
break;
}
- partition_count += 1;
}
return result;
}
+int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+ if (msg->type == ACK) {
+ /* If ACK was received then reset the awol count */
+ if (msg->seqno == remote->outgoing.next_seqno - 1) {
+ /* Ack for most recent message. Clear saved state. */
+ remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+ remote->outgoing.saved_value = 0;
+ remote->outgoing.saved_weight = 0;
+
+ remote->awol = 0;
+ }
+ /* Ignore ack if it isn't for most recent message. */
+ } else if (msg->type == MSG) {
+ if (msg->min_seqno > remote->incoming.seen_seqno) {
+ /* Entirely new information */
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+ comm->gossip.value += msg->value;
+ comm->gossip.weight += msg->weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ }
+ else if (msg->seqno > remote->incoming.seen_seqno) {
+ /* Only some of the message is old news. */
+ double diff_value = msg->value - remote->incoming.saved_value;
+ double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+
+ comm->gossip.value += diff_value;
+ comm->gossip.weight += diff_weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ }
+ else {
+ /* The entire message is old news. (Duplicate). */
+ /* Do nothing. */
+ }
+ }
+
+ return 0;
+}
+
+int find_gossip_target(comm_t *comm) {
+ int target = NULL_PEER;
+ int k;
+
+ if (comm->shuffle_index < comm->remote_node_count) {
+ target = comm->indices[comm->shuffle_index];
+ printlog(LOG_DEBUG,"GOSSIP: found index %d.\n", target);
+ comm->shuffle_index++;
+ }
+ else {
+ // shuffle the remote_limiters array
+ printlog(LOG_DEBUG, "GOSSIP: shuffling the array.\n");
+ for ( k = 0; k < comm->remote_node_count; k++) {
+ uint32_t l = myrand() % comm->remote_node_count;
+ int t;
+ t = comm->indices[l];
+ comm->indices[l] = comm->indices[k];
+ comm->indices[k] = t;
+ }
+ comm->shuffle_index = 0;
+ target = comm->indices[comm->shuffle_index];
+ printlog(LOG_DEBUG,"GOSSIP: found index after spilling over %d.\n", target);
+ comm->shuffle_index++;
+ }
+ return target;
+}
+
int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
- int i, j, targetid;
+ int i, j;
+ int retry_index = 0;
int result = 0;
remote_limiter_t *remote;
struct sockaddr_in toaddr;
double msg_value, msg_weight;
+ /* This is the factor for the portion of value/weight to keep locally.
+ * Normally this is 1, meaning that we retain the same amount of value/weight
+ * that was sent to the peers. In the case of not being able to send to a
+ * peer though, we increment this to reclaim the value/weight locally. */
+ int message_portion = 1;
+
memset(&toaddr, 0, sizeof(struct sockaddr_in));
toaddr.sin_family = AF_INET;
msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+ int targetid = NULL_PEER;
+ int rand_count = 0;
message_t msg;
printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
- if (comm->retrys[i] >= 0) {
- remote = &comm->remote_limiters[comm->retrys[i]];
- targetid = comm->retrys[i];
- //printf("%d:d:%d, ", i, comm->retrys[i]);
- } else {
- targetid = -2;
+ /* If there are any peers with unacked messages, select them first. */
+ while (retry_index < comm->remote_node_count) {
+ if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
+ targetid = retry_index;
+ printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
+ }
+
+ retry_index += 1;
+ }
+
+ while (targetid == NULL_PEER && rand_count < 10) {
+ /* Select a recipient from a randomly-shuffled array. */
+ targetid = find_gossip_target(comm);
- while (targetid == -2) {
- targetid = myrand() % comm->remote_node_count;
+ assert(targetid != NULL_PEER);
+
+ /* Don't select an already-used index. */
+ for (j = 0; j < i; ++j) {
+ if (targetid == comm->selected[j]) {
+ printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]);
+ targetid = NULL_PEER;
+ break;
+ }
+ }
- for (j = 0; j < comm->gossip.gossip_branch; ++j) {
- if (targetid == comm->retrys[j]) {
- targetid = -2;
- break;
- }
+ /* Don't select an unreachable peer or one that is not in our view. */
+ if (targetid != NULL_PEER) {
+ if (comm->remote_limiters[targetid].reachability != REACHABLE ||
+ comm->remote_limiters[targetid].view != comm->gossip.view ||
+ comm->remote_limiters[targetid].view_confidence != IN) {
+ printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d, reachability is %d, remote view is %d (confidence:%d), my view is %d\n",
+ targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].view,
+ comm->remote_limiters[targetid].view_confidence, comm->gossip.view);
+ targetid = NULL_PEER;
}
}
- remote = &comm->remote_limiters[targetid];
- //printf("%d:r:%d, ", i, targetid);
+ rand_count++;
}
+ if (targetid == NULL_PEER) {
+ /* Couldn't find a suitable peer to send to... */
+ message_portion += 1;
+ printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
+ continue;
+ } else {
+ printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
+ }
+
+ remote = &comm->remote_limiters[targetid];
+ comm->selected[i] = targetid;
+
toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
toaddr.sin_port = remote->port;
msg.seqno = remote->outgoing.next_seqno;
msg.min_seqno = remote->outgoing.first_seqno;
msg.type = MSG;
-
+ msg.view = comm->gossip.view;
+
remote->outgoing.next_seqno++;
remote->outgoing.saved_value += msg_value;
remote->outgoing.saved_weight += msg_weight;
+ /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
+ remote->awol += 1;
+
+
+#ifdef ALLOW_PARTITION
+
+ if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+ printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
+ continue;
+ }
+
+#endif
message_to_nbo(&msg);
result = errno;
break;
}
-
- comm->retrys[i] = targetid;
}
- //printf("\n");
-
- comm->gossip.value = msg_value;
- comm->gossip.weight = msg_weight;
+ comm->gossip.value = msg_value * message_portion;
+ comm->gossip.weight = msg_weight * message_portion;
return result;
}
+
+/* Old TCP code. */
#if 0
+static void hello_to_hbo(hello_t *hello) {
+ hello->magic = ntohl(hello->magic);
+ hello->ident_id = ntohl(hello->ident_id);
+ hello->port = ntohs(hello->port);
+}
+
+static void hello_to_nbo(hello_t *hello) {
+ hello->magic = htonl(hello->magic);
+ hello->ident_id = htonl(hello->ident_id);
+ hello->port = htons(hello->port);
+}
+
+static int is_connected(remote_limiter_t *remote) {
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
+ return 1;
+ else
+ return 0;
+}
+
int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
int i, targetid, sock;
int result = 0;
return result;
}
-#endif
-#if 0
void *limiter_accept_thread(void *limiter) {
sigset_t signal_mask;
}
pthread_exit(NULL);
}
+
+static void limiter_accept(comm_limiter_t *limiter) {
+ int sock, result;
+ struct sockaddr_in fromaddr;
+ socklen_t fromlen = sizeof(fromaddr);
+ remote_node_t sender;
+ remote_limiter_t *remote;
+ hello_t hello;
+ comm_ident_t *ident;
+ dent_handle *handle = NULL;
+
+ sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
+
+ assert(sock > 0);
+
+ memset(&hello, 0, sizeof(hello_t));
+ result = recv(sock, &hello, sizeof(hello_t), 0);
+
+ if (result < 0) {
+ close(sock);
+ return; /* Failure - ignore it. */
+ }
+
+ assert(result == sizeof(hello_t));
+
+ hello_to_hbo(&hello);
+
+ assert(hello.magic == MAGIC_HELLO);
+
+ memset(&sender, 0, sizeof(remote_node_t));
+ sender.addr = fromaddr.sin_addr.s_addr;
+ sender.port = ntohs(hello.port);
+
+ pthread_testcancel();
+
+ pthread_rwlock_rdlock(&limiter->rwlock);
+
+ handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
+
+ if (handle == NULL) {
+ printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
+ pthread_rwlock_unlock(&limiter->rwlock);
+ return;
+ }
+
+ ident = limiter->identities[*handle];
+ assert(ident != NULL);
+
+ pthread_mutex_lock(&ident->lock);
+
+ remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
+
+ if (remote == NULL) {
+ printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+ close(sock);
+ return;
+ }
+
+ if (is_connected(remote)) {
+ /* We are still connected, don't need the new socket. */
+ close(sock);
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+ return;
+ }
+
+ /* We weren't connected, but we are now... */
+ remote->socket = sock;
+ printf("Got connection on: %d\n", sock);
+ FD_SET(sock, &ident->fds);
+
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+}
+
+static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
+ int result;
+ message_t msg;
+
+ memset(&msg, 0, sizeof(message_t));
+
+ result = recv(sock, &msg, sizeof(message_t), 0);
+
+ if (result < 0) {
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+ FD_CLR(sock, &ident->fds);
+ close(sock);
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+ return;
+ }
+
+ assert(result == sizeof(message_t));
+
+ message_to_hbo(&msg);
+ assert(msg.magic == MAGIC_MSG);
+
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+
+ switch (ident->comm_fabric) {
+ case COMM_GOSSIP: {
+ ident->gossip.value += msg.value;
+ ident->gossip.weight += msg.weight;
+ }
+ break;
+
+ default: {
+ assert(1 == 0); /* This case shouldn't happen. Punt for now... */
+ }
+ }
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+}
+
+static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
+ int select_result, i;
+ fd_set fds_copy;
+ struct timeval timeout;
+
+ FD_ZERO(&fds_copy);
+ timeout.tv_sec = 15;
+ timeout.tv_usec = 0;
+
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+ memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+
+ /* mask interrupt signals for this thread? */
+
+ select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
+
+ assert(select_result >= 0);
+
+ if (select_result == 0)
+ return; /* Timed out */
+
+ for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
+ if (FD_ISSET(i, &fds_copy)) {
+ read_tcp_message(ident, limiter_rwlock, i);
+ select_result--;
+ }
+ }
+}
#endif