/* See the DRL-LICENSE file for this file's software license. */ #define _XOPEN_SOURCE 600 /* Debug output. */ #include #include /* Socket functions. */ #include #include /* Byte ordering and address structures. */ #include /* memset() */ #include /* close() & usleep */ #include /* Mutex lock/unlock. */ #include /* perror() */ #include /* select() w/ timeout */ #include #include /* assert() */ #include /* sigaddset(), sigemptyset(), SIGHUP, etc. */ #include /* DRL data structures. */ #include "raterouter.h" #include "ratetypes.h" #include "drl_state.h" #include "peer_comm.h" #include "logging.h" /* Artifically makes a network partition. */ int do_partition = 0; int partition_set = 0xfffffff; extern limiter_t limiter; static const uint32_t MAGIC_MSG = 0x123123; static const uint32_t MAGIC_HELLO = 0x456456; static const uint16_t MSG = 1; static const uint16_t ACK = 2; static void message_to_hbo(message_t *msg) { msg->magic = ntohl(msg->magic); msg->ident_id = ntohl(msg->ident_id); msg->seqno = ntohl(msg->seqno); msg->min_seqno = ntohl(msg->min_seqno); msg->type = ntohs(msg->type); /* value is a double */ /* weight is a double */ } static void message_to_nbo(message_t *msg) { msg->magic = htonl(msg->magic); msg->ident_id = htonl(msg->ident_id); msg->seqno = htonl(msg->seqno); msg->min_seqno = htonl(msg->min_seqno); msg->type = htons(msg->type); /* value is a double */ /* weight is a double */ } static void hello_to_hbo(hello_t *hello) { hello->magic = ntohl(hello->magic); hello->ident_id = ntohl(hello->ident_id); hello->port = ntohs(hello->port); } static void hello_to_nbo(hello_t *hello) { hello->magic = htonl(hello->magic); hello->ident_id = htonl(hello->ident_id); hello->port = htons(hello->port); } static int is_connected(remote_limiter_t *remote) { struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0) return 1; else return 0; } static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) { int result = 0; message_t msg; struct sockaddr_in toaddr; memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; memset(&msg, 0, sizeof(msg)); msg.magic = MAGIC_MSG; msg.ident_id = ident->id; msg.type = ACK; msg.seqno = seqno; message_to_nbo(&msg); if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "send_ack: sento failed.\n"); result = errno; } return result; } void limiter_receive() { struct sockaddr_in fromaddr; remote_node_t sender; socklen_t fromlen = sizeof(fromaddr); identity_t *ident = NULL; remote_limiter_t *remote = NULL; message_t msg; if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) { /* recv failed. Log and continue. */ printlog(LOG_WARN, "recv failed to read full message.\n"); return; } memset(&sender, 0, sizeof(remote_node_t)); sender.addr = fromaddr.sin_addr.s_addr; sender.port = fromaddr.sin_port; message_to_hbo(&msg); assert(msg.magic == MAGIC_MSG); #if 0 printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n", msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t)); #endif pthread_testcancel(); pthread_rwlock_rdlock(&limiter.limiter_lock); ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id, sizeof(msg.ident_id)); if (ident == NULL) { printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n"); pthread_rwlock_unlock(&limiter.limiter_lock); return; } pthread_mutex_lock(&ident->comm.lock); remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t)); if (remote == NULL) { printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n"); pthread_mutex_unlock(&ident->comm.lock); pthread_rwlock_unlock(&limiter.limiter_lock); return; } switch (ident->comm.comm_fabric) { case COMM_MESH: { /* Use the message's value to be our new GRDrate/FPSweight for the * message's sender. */ remote->rate = msg.value; /* Reset the AWOL counter to zero since we received an update. */ remote->awol = 0; } break; case COMM_GOSSIP: { if (msg.type == ACK) { if (msg.seqno == remote->outgoing.next_seqno - 1) { int i; /* Ack for most recent message. Clear saved state. */ remote->outgoing.first_seqno = remote->outgoing.next_seqno; remote->outgoing.saved_value = 0; remote->outgoing.saved_weight = 0; for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) { if (ident->comm.retrys[i] >= 0 && remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) { //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]); ident->comm.retrys[i] = -2; } } } /* Ignore ack if it isn't for most recent message. */ } else { if (msg.min_seqno > remote->incoming.seen_seqno) { /* Entirely new information */ remote->incoming.seen_seqno = msg.seqno; remote->incoming.saved_value = msg.value; remote->incoming.saved_weight = msg.weight; ident->comm.gossip.value += msg.value; ident->comm.gossip.weight += msg.weight; send_ack(ident, remote, msg.seqno); } else if (msg.seqno > remote->incoming.seen_seqno) { /* Only some of the message is old news. */ double diff_value = msg.value - remote->incoming.saved_value; double diff_weight = msg.weight - remote->incoming.saved_weight; remote->incoming.seen_seqno = msg.seqno; remote->incoming.saved_value = msg.value; remote->incoming.saved_weight = msg.weight; ident->comm.gossip.value += diff_value; ident->comm.gossip.weight += diff_weight; send_ack(ident, remote, msg.seqno); } else { /* The entire message is old news. (Duplicate). */ /* Do nothing. */ } } } break; default: { printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n"); } } pthread_mutex_unlock(&ident->comm.lock); pthread_rwlock_unlock(&limiter.limiter_lock); } #if 0 static void limiter_accept(comm_limiter_t *limiter) { int sock, result; struct sockaddr_in fromaddr; socklen_t fromlen = sizeof(fromaddr); remote_node_t sender; remote_limiter_t *remote; hello_t hello; comm_ident_t *ident; ident_handle *handle = NULL; sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen); assert(sock > 0); memset(&hello, 0, sizeof(hello_t)); result = recv(sock, &hello, sizeof(hello_t), 0); if (result < 0) { close(sock); return; /* Failure - ignore it. */ } assert(result == sizeof(hello_t)); hello_to_hbo(&hello); assert(hello.magic == MAGIC_HELLO); memset(&sender, 0, sizeof(remote_node_t)); sender.addr = fromaddr.sin_addr.s_addr; sender.port = ntohs(hello.port); pthread_testcancel(); pthread_rwlock_rdlock(&limiter->rwlock); handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id)); if (handle == NULL) { printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n"); pthread_rwlock_unlock(&limiter->rwlock); return; } ident = limiter->identities[*handle]; assert(ident != NULL); pthread_mutex_lock(&ident->lock); remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t)); if (remote == NULL) { printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n"); pthread_mutex_unlock(&ident->lock); pthread_rwlock_unlock(&limiter->rwlock); close(sock); return; } if (is_connected(remote)) { /* We are still connected, don't need the new socket. */ close(sock); pthread_mutex_unlock(&ident->lock); pthread_rwlock_unlock(&limiter->rwlock); return; } /* We weren't connected, but we are now... */ remote->socket = sock; printf("Got connection on: %d\n", sock); FD_SET(sock, &ident->fds); pthread_mutex_unlock(&ident->lock); pthread_rwlock_unlock(&limiter->rwlock); } static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) { int result; message_t msg; memset(&msg, 0, sizeof(message_t)); result = recv(sock, &msg, sizeof(message_t), 0); if (result < 0) { pthread_rwlock_rdlock(limiter_rwlock); pthread_mutex_lock(&ident->lock); FD_CLR(sock, &ident->fds); close(sock); pthread_mutex_unlock(&ident->lock); pthread_rwlock_unlock(limiter_rwlock); return; } assert(result == sizeof(message_t)); message_to_hbo(&msg); assert(msg.magic == MAGIC_MSG); pthread_rwlock_rdlock(limiter_rwlock); pthread_mutex_lock(&ident->lock); switch (ident->comm_fabric) { case COMM_GOSSIP: { ident->gossip.value += msg.value; ident->gossip.weight += msg.weight; } break; default: { assert(1 == 0); /* This case shouldn't happen. Punt for now... */ } } pthread_mutex_unlock(&ident->lock); pthread_rwlock_unlock(limiter_rwlock); } static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) { int select_result, i; fd_set fds_copy; struct timeval timeout; FD_ZERO(&fds_copy); timeout.tv_sec = 15; timeout.tv_usec = 0; pthread_rwlock_rdlock(limiter_rwlock); pthread_mutex_lock(&ident->lock); memcpy(&fds_copy, &ident->fds, sizeof(fd_set)); pthread_mutex_unlock(&ident->lock); pthread_rwlock_unlock(limiter_rwlock); /* mask interrupt signals for this thread? */ select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout); assert(select_result >= 0); if (select_result == 0) return; /* Timed out */ for (i = 0; (i < FD_SETSIZE) && select_result; ++i) { if (FD_ISSET(i, &fds_copy)) { read_tcp_message(ident, limiter_rwlock, i); select_result--; } } } #endif #define ALLOW_PARTITION int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { int result = 0; remote_limiter_t *remote; message_t msg; struct sockaddr_in toaddr; int i; #ifdef ALLOW_PARTITION int partition_count = 0; struct in_addr dest; char dest_ip[22]; #endif memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; memset(&msg, 0, sizeof(message_t)); msg.magic = MAGIC_MSG; msg.ident_id = id; msg.value = comm->local_rate; /* Do we want seqnos for mesh? We can get by without them. */ message_to_nbo(&msg); /* Iterate though and send update to all remote limiters in our identity. */ for (i = 0; i < comm->remote_node_count; ++i) { remote = &comm->remote_limiters[i]; #ifdef ALLOW_PARTITION if (do_partition) { printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n", partition_set, partition_count, partition_set & (1 << partition_count)); /* If the partition count bit isn't high in the set, don't actually send anything. */ if ((partition_set & (1 << partition_count)) == 0) { dest.s_addr = ntohl(remote->addr); strcpy(dest_ip, inet_ntoa(dest)); printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip); partition_count += 1; continue; } } #endif toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); result = errno; printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); break; } partition_count += 1; } return result; } int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { int i, j, targetid; int result = 0; remote_limiter_t *remote; struct sockaddr_in toaddr; double msg_value, msg_weight; memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1); msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1); for (i = 0; i < comm->gossip.gossip_branch; ++i) { message_t msg; printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch); if (comm->retrys[i] >= 0) { remote = &comm->remote_limiters[comm->retrys[i]]; targetid = comm->retrys[i]; //printf("%d:d:%d, ", i, comm->retrys[i]); } else { targetid = -2; while (targetid == -2) { targetid = myrand() % comm->remote_node_count; for (j = 0; j < comm->gossip.gossip_branch; ++j) { if (targetid == comm->retrys[j]) { targetid = -2; break; } } } remote = &comm->remote_limiters[targetid]; //printf("%d:r:%d, ", i, targetid); } toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; memset(&msg, 0, sizeof(message_t)); msg.magic = MAGIC_MSG; msg.ident_id = id; msg.value = msg_value + remote->outgoing.saved_value; msg.weight = msg_weight + remote->outgoing.saved_weight; msg.seqno = remote->outgoing.next_seqno; msg.min_seqno = remote->outgoing.first_seqno; msg.type = MSG; remote->outgoing.next_seqno++; remote->outgoing.saved_value += msg_value; remote->outgoing.saved_weight += msg_weight; message_to_nbo(&msg); if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n"); result = errno; break; } comm->retrys[i] = targetid; } //printf("\n"); comm->gossip.value = msg_value; comm->gossip.weight = msg_weight; return result; } #if 0 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) { int i, targetid, sock; int result = 0; message_t msg; memset(&msg, 0, sizeof(message_t)); msg.magic = MAGIC_MSG; msg.ident_id = ident->ident_id; msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1); msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1); message_to_nbo(&msg); for (i = 0; i < ident->gossip.gossip_branch; ++i) { targetid = myrand() % ident->remote_node_count; sock = ident->remote_limiters[targetid].socket; result = send(sock, &msg, sizeof(message_t), 0); if (result < 0) { result = errno; FD_CLR(sock, &ident->fds); close(sock); break; } assert(result == sizeof(message_t)); } ident->gossip.value /= (ident->gossip.gossip_branch + 1); ident->gossip.weight /= (ident->gossip.gossip_branch + 1); return result; } #endif #if 0 void *limiter_accept_thread(void *limiter) { sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); while (1) { limiter_accept((comm_limiter_t *) limiter); } pthread_exit(NULL); } void *ident_receive_thread(void *recv_args) { int i, sock, result; struct recv_thread_args *args = (struct recv_thread_args *) recv_args; comm_ident_t *ident = args->ident; pthread_rwlock_t *lock = args->lock; uint16_t port = args->port; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); hello_t hello; free(args); while (1) { memset(&hello, 0, sizeof(hello_t)); /*Try to connect to all remote nodes if they aren't already connected.*/ pthread_rwlock_rdlock(lock); pthread_mutex_lock(&ident->lock); hello.magic = MAGIC_HELLO; hello.ident_id = ident->ident_id; hello.port = ntohs(port); hello_to_nbo(&hello); for (i = 0; i < ident->remote_node_count; ++i) { if (is_connected(&ident->remote_limiters[i])) continue; /* Ignore if it's already connected */ sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (sock < 0) { perror("socket"); continue; } assert(sock >= 0); memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_port = ident->remote_limiters[i].port; addr.sin_addr.s_addr = ident->remote_limiters[i].addr; result = connect(sock, (struct sockaddr *) &addr, addrlen); if (result < 0) { close(sock); continue; } result = send(sock, &hello, sizeof(hello_t), 0); if (result < 0) { close(sock); continue; } assert(result == sizeof(hello_t)); ident->remote_limiters[i].socket = sock; printf("Connected on socket: %d\n", sock); FD_SET(sock, &ident->fds); } pthread_rwlock_unlock(lock); pthread_mutex_unlock(&ident->lock); ident_receive(ident, lock); } pthread_exit(NULL); } #endif