X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=drl%2Fpeer_comm.c;h=fc2dc86a28619d45c6e33bf6a166b909b237d109;hb=266bcb55dbeb328bc583f3b712e7ab90d7ff004e;hp=41ed778bdaf6bbe166035cab8d6c6bcc33e2aae8;hpb=f83340496f632165030cc92cd98408a87082f6b1;p=distributedratelimiting.git diff --git a/drl/peer_comm.c b/drl/peer_comm.c index 41ed778..fc2dc86 100644 --- a/drl/peer_comm.c +++ b/drl/peer_comm.c @@ -42,60 +42,51 @@ #include "peer_comm.h" #include "logging.h" -/* Artifically makes a network partition. */ -int do_partition = 0; -int partition_set = 0xfffffff; +#define NULL_PEER (-2) +#define MESH_REMOTE_AWOL_THRESHOLD (5) +#define GOSSIP_REMOTE_AWOL_THRESHOLD (5) -extern limiter_t limiter; +/* From ulogd_DRL.c */ +extern int do_partition; +extern int partition_set; -static const uint32_t MAGIC_MSG = 0x123123; -static const uint32_t MAGIC_HELLO = 0x456456; -static const uint16_t MSG = 1; -static const uint16_t ACK = 2; +extern limiter_t limiter; -static void message_to_hbo(message_t *msg) { +void message_to_hbo(message_t *msg) { msg->magic = ntohl(msg->magic); msg->ident_id = ntohl(msg->ident_id); + /* value is a double */ + /* weight is a double */ msg->seqno = ntohl(msg->seqno); msg->min_seqno = ntohl(msg->min_seqno); msg->type = ntohs(msg->type); - /* value is a double */ - /* weight is a double */ + /* ping_source, ping_port, check_target, and check_port stay in nbo. */ + msg->checkack_value = ntohl(msg->checkack_value); + msg->update_present = ntohl(msg->update_present); + msg->reachability = ntohl(msg->reachability); + msg->incarnation = ntohl(msg->incarnation); + /* node has two fields, both stay in nbo. */ + msg->view = ntohl(msg->view); } -static void message_to_nbo(message_t *msg) { +void message_to_nbo(message_t *msg) { msg->magic = htonl(msg->magic); msg->ident_id = htonl(msg->ident_id); + /* value is a double */ + /* weight is a double */ msg->seqno = htonl(msg->seqno); msg->min_seqno = htonl(msg->min_seqno); msg->type = htons(msg->type); - /* value is a double */ - /* weight is a double */ -} - -static void hello_to_hbo(hello_t *hello) { - hello->magic = ntohl(hello->magic); - hello->ident_id = ntohl(hello->ident_id); - hello->port = ntohs(hello->port); -} - -static void hello_to_nbo(hello_t *hello) { - hello->magic = htonl(hello->magic); - hello->ident_id = htonl(hello->ident_id); - hello->port = htons(hello->port); -} - -static int is_connected(remote_limiter_t *remote) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - - if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0) - return 1; - else - return 0; + /* ping_source, ping_port, check_target, and check_port already in nbo. */ + msg->checkack_value = htonl(msg->checkack_value); + msg->update_present = htonl(msg->update_present); + msg->reachability = htonl(msg->reachability); + msg->incarnation = htonl(msg->incarnation); + /* node has two fields, both already in nbo. */ + msg->view = htonl(msg->view); } -static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) { +int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view) { int result = 0; message_t msg; struct sockaddr_in toaddr; @@ -108,9 +99,10 @@ static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) memset(&msg, 0, sizeof(msg)); msg.magic = MAGIC_MSG; - msg.ident_id = ident->id; - msg.type = ACK; + msg.ident_id = id; + msg.type = type; msg.seqno = seqno; + msg.view = view; message_to_nbo(&msg); @@ -171,227 +163,26 @@ void limiter_receive() { return; } - switch (ident->comm.comm_fabric) { - case COMM_MESH: { - /* Use the message's value to be our new GRDrate/FPSweight for the - * message's sender. */ - remote->rate = msg.value; - - /* Reset the AWOL counter to zero since we received an update. */ - remote->awol = 0; - } - break; - - case COMM_GOSSIP: { - if (msg.type == ACK) { - if (msg.seqno == remote->outgoing.next_seqno - 1) { - int i; - - /* Ack for most recent message. Clear saved state. */ - remote->outgoing.first_seqno = remote->outgoing.next_seqno; - remote->outgoing.saved_value = 0; - remote->outgoing.saved_weight = 0; - - for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) { - if (ident->comm.retrys[i] >= 0 && - remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) { - //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]); - ident->comm.retrys[i] = -2; - } - } - } - /* Ignore ack if it isn't for most recent message. */ - } else { - if (msg.min_seqno > remote->incoming.seen_seqno) { - /* Entirely new information */ - remote->incoming.seen_seqno = msg.seqno; - remote->incoming.saved_value = msg.value; - remote->incoming.saved_weight = msg.weight; - ident->comm.gossip.value += msg.value; - ident->comm.gossip.weight += msg.weight; - send_ack(ident, remote, msg.seqno); - } else if (msg.seqno > remote->incoming.seen_seqno) { - /* Only some of the message is old news. */ - double diff_value = msg.value - remote->incoming.saved_value; - double diff_weight = msg.weight - remote->incoming.saved_weight; - - remote->incoming.seen_seqno = msg.seqno; - remote->incoming.saved_value = msg.value; - remote->incoming.saved_weight = msg.weight; - - ident->comm.gossip.value += diff_value; - ident->comm.gossip.weight += diff_weight; - send_ack(ident, remote, msg.seqno); - } else { - /* The entire message is old news. (Duplicate). */ - /* Do nothing. */ - } - } - } - break; - - default: { - printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n"); - } - } - + /* Pass the message to the comm's recv function, which is responsible for + * processing its contents. */ + ident->comm.recv_function(&ident->comm, ident->id, limiter.udp_socket, remote, &msg); + pthread_mutex_unlock(&ident->comm.lock); pthread_rwlock_unlock(&limiter.limiter_lock); } -#if 0 -static void limiter_accept(comm_limiter_t *limiter) { - int sock, result; - struct sockaddr_in fromaddr; - socklen_t fromlen = sizeof(fromaddr); - remote_node_t sender; - remote_limiter_t *remote; - hello_t hello; - comm_ident_t *ident; - ident_handle *handle = NULL; - - sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen); - - assert(sock > 0); - - memset(&hello, 0, sizeof(hello_t)); - result = recv(sock, &hello, sizeof(hello_t), 0); - - if (result < 0) { - close(sock); - return; /* Failure - ignore it. */ - } - - assert(result == sizeof(hello_t)); - - hello_to_hbo(&hello); - - assert(hello.magic == MAGIC_HELLO); - - memset(&sender, 0, sizeof(remote_node_t)); - sender.addr = fromaddr.sin_addr.s_addr; - sender.port = ntohs(hello.port); - - pthread_testcancel(); - - pthread_rwlock_rdlock(&limiter->rwlock); - - handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id)); - - if (handle == NULL) { - printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n"); - pthread_rwlock_unlock(&limiter->rwlock); - return; - } +int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) { + /* Use the message's value to be our new GRDrate/FPSweight for the + * message's sender. */ + remote->rate = msg->value; - ident = limiter->identities[*handle]; - assert(ident != NULL); + /* Reset the AWOL counter to zero since we received an update. */ + remote->awol = 0; + remote->reachability = REACHABLE; - pthread_mutex_lock(&ident->lock); - - remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t)); - - if (remote == NULL) { - printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n"); - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(&limiter->rwlock); - close(sock); - return; - } - - if (is_connected(remote)) { - /* We are still connected, don't need the new socket. */ - close(sock); - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(&limiter->rwlock); - return; - } - - /* We weren't connected, but we are now... */ - remote->socket = sock; - printf("Got connection on: %d\n", sock); - FD_SET(sock, &ident->fds); - - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(&limiter->rwlock); + return 0; } -static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) { - int result; - message_t msg; - - memset(&msg, 0, sizeof(message_t)); - - result = recv(sock, &msg, sizeof(message_t), 0); - - if (result < 0) { - pthread_rwlock_rdlock(limiter_rwlock); - pthread_mutex_lock(&ident->lock); - FD_CLR(sock, &ident->fds); - close(sock); - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(limiter_rwlock); - return; - } - - assert(result == sizeof(message_t)); - - message_to_hbo(&msg); - assert(msg.magic == MAGIC_MSG); - - pthread_rwlock_rdlock(limiter_rwlock); - pthread_mutex_lock(&ident->lock); - - switch (ident->comm_fabric) { - case COMM_GOSSIP: { - ident->gossip.value += msg.value; - ident->gossip.weight += msg.weight; - } - break; - - default: { - assert(1 == 0); /* This case shouldn't happen. Punt for now... */ - } - } - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(limiter_rwlock); -} - -static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) { - int select_result, i; - fd_set fds_copy; - struct timeval timeout; - - FD_ZERO(&fds_copy); - timeout.tv_sec = 15; - timeout.tv_usec = 0; - - pthread_rwlock_rdlock(limiter_rwlock); - pthread_mutex_lock(&ident->lock); - memcpy(&fds_copy, &ident->fds, sizeof(fd_set)); - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(limiter_rwlock); - - /* mask interrupt signals for this thread? */ - - select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout); - - assert(select_result >= 0); - - if (select_result == 0) - return; /* Timed out */ - - for (i = 0; (i < FD_SETSIZE) && select_result; ++i) { - if (FD_ISSET(i, &fds_copy)) { - read_tcp_message(ident, limiter_rwlock, i); - select_result--; - } - } -} -#endif - -#define ALLOW_PARTITION - int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { int result = 0; remote_limiter_t *remote; @@ -414,6 +205,7 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { msg.magic = MAGIC_MSG; msg.ident_id = id; msg.value = comm->local_rate; + msg.view = comm->gossip.view; /* Do we want seqnos for mesh? We can get by without them. */ message_to_nbo(&msg); @@ -422,6 +214,14 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { for (i = 0; i < comm->remote_node_count; ++i) { remote = &comm->remote_limiters[i]; + /* Increase this counter. For mesh, it represents the number of messages we have sent to + * this remote limiter without having heard from it. This is reset to 0 when we receive + * an update from this peer. */ + remote->awol += 1; + if (remote->awol > MESH_REMOTE_AWOL_THRESHOLD) { + remote->reachability = UNREACHABLE; + } + #ifdef ALLOW_PARTITION if (do_partition) { @@ -439,6 +239,8 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { } } + partition_count += 1; + #endif toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ @@ -449,19 +251,98 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); break; } - partition_count += 1; } return result; } +int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) { + if (msg->type == ACK) { + /* If ACK was received then reset the awol count */ + if (msg->seqno == remote->outgoing.next_seqno - 1) { + /* Ack for most recent message. Clear saved state. */ + remote->outgoing.first_seqno = remote->outgoing.next_seqno; + remote->outgoing.saved_value = 0; + remote->outgoing.saved_weight = 0; + + remote->awol = 0; + } + /* Ignore ack if it isn't for most recent message. */ + } else if (msg->type == MSG) { + if (msg->min_seqno > remote->incoming.seen_seqno) { + /* Entirely new information */ + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + comm->gossip.value += msg->value; + comm->gossip.weight += msg->weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + } + else if (msg->seqno > remote->incoming.seen_seqno) { + /* Only some of the message is old news. */ + double diff_value = msg->value - remote->incoming.saved_value; + double diff_weight = msg->weight - remote->incoming.saved_weight; + + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + + comm->gossip.value += diff_value; + comm->gossip.weight += diff_weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + } + else { + /* The entire message is old news. (Duplicate). */ + /* Do nothing. */ + } + } + + return 0; +} + +int find_gossip_target(comm_t *comm) { + int target = NULL_PEER; + int k; + + if (comm->shuffle_index < comm->remote_node_count) { + target = comm->indices[comm->shuffle_index]; + printlog(LOG_DEBUG,"GOSSIP: found index %d.\n", target); + comm->shuffle_index++; + } + else { + // shuffle the remote_limiters array + printlog(LOG_DEBUG, "GOSSIP: shuffling the array.\n"); + for ( k = 0; k < comm->remote_node_count; k++) { + uint32_t l = myrand() % comm->remote_node_count; + int t; + t = comm->indices[l]; + comm->indices[l] = comm->indices[k]; + comm->indices[k] = t; + } + comm->shuffle_index = 0; + target = comm->indices[comm->shuffle_index]; + printlog(LOG_DEBUG,"GOSSIP: found index after spilling over %d.\n", target); + comm->shuffle_index++; + } + return target; +} + int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { - int i, j, targetid; + int i, j; + int retry_index = 0; int result = 0; remote_limiter_t *remote; struct sockaddr_in toaddr; double msg_value, msg_weight; + /* This is the factor for the portion of value/weight to keep locally. + * Normally this is 1, meaning that we retain the same amount of value/weight + * that was sent to the peers. In the case of not being able to send to a + * peer though, we increment this to reclaim the value/weight locally. */ + int message_portion = 1; + memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; @@ -469,32 +350,64 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1); for (i = 0; i < comm->gossip.gossip_branch; ++i) { + int targetid = NULL_PEER; + int rand_count = 0; message_t msg; printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch); - if (comm->retrys[i] >= 0) { - remote = &comm->remote_limiters[comm->retrys[i]]; - targetid = comm->retrys[i]; - //printf("%d:d:%d, ", i, comm->retrys[i]); - } else { - targetid = -2; + /* If there are any peers with unacked messages, select them first. */ + while (retry_index < comm->remote_node_count) { + if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) { + targetid = retry_index; + printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid); + } + + retry_index += 1; + } + + while (targetid == NULL_PEER && rand_count < 10) { + /* Select a recipient from a randomly-shuffled array. */ + targetid = find_gossip_target(comm); - while (targetid == -2) { - targetid = myrand() % comm->remote_node_count; + assert(targetid != NULL_PEER); + + /* Don't select an already-used index. */ + for (j = 0; j < i; ++j) { + if (targetid == comm->selected[j]) { + printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]); + targetid = NULL_PEER; + break; + } + } - for (j = 0; j < comm->gossip.gossip_branch; ++j) { - if (targetid == comm->retrys[j]) { - targetid = -2; - break; - } + /* Don't select an unreachable peer or one that is not in our view. */ + if (targetid != NULL_PEER) { + if (comm->remote_limiters[targetid].reachability != REACHABLE || + comm->remote_limiters[targetid].view != comm->gossip.view || + comm->remote_limiters[targetid].view_confidence != IN) { + printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d, reachability is %d, remote view is %d (confidence:%d), my view is %d\n", + targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].view, + comm->remote_limiters[targetid].view_confidence, comm->gossip.view); + targetid = NULL_PEER; } } - remote = &comm->remote_limiters[targetid]; - //printf("%d:r:%d, ", i, targetid); + rand_count++; } + if (targetid == NULL_PEER) { + /* Couldn't find a suitable peer to send to... */ + message_portion += 1; + printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n"); + continue; + } else { + printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid); + } + + remote = &comm->remote_limiters[targetid]; + comm->selected[i] = targetid; + toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; @@ -506,10 +419,23 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { msg.seqno = remote->outgoing.next_seqno; msg.min_seqno = remote->outgoing.first_seqno; msg.type = MSG; - + msg.view = comm->gossip.view; + remote->outgoing.next_seqno++; remote->outgoing.saved_value += msg_value; remote->outgoing.saved_weight += msg_weight; + /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */ + remote->awol += 1; + + +#ifdef ALLOW_PARTITION + + if (do_partition && ((partition_set & (1 << targetid)) == 0)) { + printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid); + continue; + } + +#endif message_to_nbo(&msg); @@ -518,18 +444,38 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { result = errno; break; } - - comm->retrys[i] = targetid; } - //printf("\n"); - - comm->gossip.value = msg_value; - comm->gossip.weight = msg_weight; + comm->gossip.value = msg_value * message_portion; + comm->gossip.weight = msg_weight * message_portion; return result; } + +/* Old TCP code. */ #if 0 +static void hello_to_hbo(hello_t *hello) { + hello->magic = ntohl(hello->magic); + hello->ident_id = ntohl(hello->ident_id); + hello->port = ntohs(hello->port); +} + +static void hello_to_nbo(hello_t *hello) { + hello->magic = htonl(hello->magic); + hello->ident_id = htonl(hello->ident_id); + hello->port = htons(hello->port); +} + +static int is_connected(remote_limiter_t *remote) { + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0) + return 1; + else + return 0; +} + int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) { int i, targetid, sock; int result = 0; @@ -563,9 +509,7 @@ int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) { return result; } -#endif -#if 0 void *limiter_accept_thread(void *limiter) { sigset_t signal_mask; @@ -648,4 +592,153 @@ void *ident_receive_thread(void *recv_args) { } pthread_exit(NULL); } + +static void limiter_accept(comm_limiter_t *limiter) { + int sock, result; + struct sockaddr_in fromaddr; + socklen_t fromlen = sizeof(fromaddr); + remote_node_t sender; + remote_limiter_t *remote; + hello_t hello; + comm_ident_t *ident; + dent_handle *handle = NULL; + + sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen); + + assert(sock > 0); + + memset(&hello, 0, sizeof(hello_t)); + result = recv(sock, &hello, sizeof(hello_t), 0); + + if (result < 0) { + close(sock); + return; /* Failure - ignore it. */ + } + + assert(result == sizeof(hello_t)); + + hello_to_hbo(&hello); + + assert(hello.magic == MAGIC_HELLO); + + memset(&sender, 0, sizeof(remote_node_t)); + sender.addr = fromaddr.sin_addr.s_addr; + sender.port = ntohs(hello.port); + + pthread_testcancel(); + + pthread_rwlock_rdlock(&limiter->rwlock); + + handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id)); + + if (handle == NULL) { + printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n"); + pthread_rwlock_unlock(&limiter->rwlock); + return; + } + + ident = limiter->identities[*handle]; + assert(ident != NULL); + + pthread_mutex_lock(&ident->lock); + + remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t)); + + if (remote == NULL) { + printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n"); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); + close(sock); + return; + } + + if (is_connected(remote)) { + /* We are still connected, don't need the new socket. */ + close(sock); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); + return; + } + + /* We weren't connected, but we are now... */ + remote->socket = sock; + printf("Got connection on: %d\n", sock); + FD_SET(sock, &ident->fds); + + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); +} + +static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) { + int result; + message_t msg; + + memset(&msg, 0, sizeof(message_t)); + + result = recv(sock, &msg, sizeof(message_t), 0); + + if (result < 0) { + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + FD_CLR(sock, &ident->fds); + close(sock); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); + return; + } + + assert(result == sizeof(message_t)); + + message_to_hbo(&msg); + assert(msg.magic == MAGIC_MSG); + + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + + switch (ident->comm_fabric) { + case COMM_GOSSIP: { + ident->gossip.value += msg.value; + ident->gossip.weight += msg.weight; + } + break; + + default: { + assert(1 == 0); /* This case shouldn't happen. Punt for now... */ + } + } + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); +} + +static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) { + int select_result, i; + fd_set fds_copy; + struct timeval timeout; + + FD_ZERO(&fds_copy); + timeout.tv_sec = 15; + timeout.tv_usec = 0; + + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + memcpy(&fds_copy, &ident->fds, sizeof(fd_set)); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); + + /* mask interrupt signals for this thread? */ + + select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout); + + assert(select_result >= 0); + + if (select_result == 0) + return; /* Timed out */ + + for (i = 0; (i < FD_SETSIZE) && select_result; ++i) { + if (FD_ISSET(i, &fds_copy)) { + read_tcp_message(ident, limiter_rwlock, i); + select_result--; + } + } +} #endif