+
+static void limiter_accept(comm_limiter_t *limiter) {
+ int sock, result;
+ struct sockaddr_in fromaddr;
+ socklen_t fromlen = sizeof(fromaddr);
+ remote_node_t sender;
+ remote_limiter_t *remote;
+ hello_t hello;
+ comm_ident_t *ident;
+ dent_handle *handle = NULL;
+
+ sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
+
+ assert(sock > 0);
+
+ memset(&hello, 0, sizeof(hello_t));
+ result = recv(sock, &hello, sizeof(hello_t), 0);
+
+ if (result < 0) {
+ close(sock);
+ return; /* Failure - ignore it. */
+ }
+
+ assert(result == sizeof(hello_t));
+
+ hello_to_hbo(&hello);
+
+ assert(hello.magic == MAGIC_HELLO);
+
+ memset(&sender, 0, sizeof(remote_node_t));
+ sender.addr = fromaddr.sin_addr.s_addr;
+ sender.port = ntohs(hello.port);
+
+ pthread_testcancel();
+
+ pthread_rwlock_rdlock(&limiter->rwlock);
+
+ handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
+
+ if (handle == NULL) {
+ printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
+ pthread_rwlock_unlock(&limiter->rwlock);
+ return;
+ }
+
+ ident = limiter->identities[*handle];
+ assert(ident != NULL);
+
+ pthread_mutex_lock(&ident->lock);
+
+ remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
+
+ if (remote == NULL) {
+ printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+ close(sock);
+ return;
+ }
+
+ if (is_connected(remote)) {
+ /* We are still connected, don't need the new socket. */
+ close(sock);
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+ return;
+ }
+
+ /* We weren't connected, but we are now... */
+ remote->socket = sock;
+ printf("Got connection on: %d\n", sock);
+ FD_SET(sock, &ident->fds);
+
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+}
+
+static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
+ int result;
+ message_t msg;
+
+ memset(&msg, 0, sizeof(message_t));
+
+ result = recv(sock, &msg, sizeof(message_t), 0);
+
+ if (result < 0) {
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+ FD_CLR(sock, &ident->fds);
+ close(sock);
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+ return;
+ }
+
+ assert(result == sizeof(message_t));
+
+ message_to_hbo(&msg);
+ assert(msg.magic == MAGIC_MSG);
+
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+
+ switch (ident->comm_fabric) {
+ case COMM_GOSSIP: {
+ ident->gossip.value += msg.value;
+ ident->gossip.weight += msg.weight;
+ }
+ break;
+
+ default: {
+ assert(1 == 0); /* This case shouldn't happen. Punt for now... */
+ }
+ }
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+}
+
+static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
+ int select_result, i;
+ fd_set fds_copy;
+ struct timeval timeout;
+
+ FD_ZERO(&fds_copy);
+ timeout.tv_sec = 15;
+ timeout.tv_usec = 0;
+
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+ memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+
+ /* mask interrupt signals for this thread? */
+
+ select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
+
+ assert(select_result >= 0);
+
+ if (select_result == 0)
+ return; /* Timed out */
+
+ for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
+ if (FD_ISSET(i, &fds_copy)) {
+ read_tcp_message(ident, limiter_rwlock, i);
+ select_result--;
+ }
+ }
+}