X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=drl%2Fpeer_comm.c;fp=drl%2Fpeer_comm.c;h=cb306574680c90557a9789b7c9634887207ee5c0;hb=0be9704d6b24d09ebd55beedec52758cb88c570b;hp=0000000000000000000000000000000000000000;hpb=6747e89080a8265aa73320bd9f40a0fa6e1c161e;p=distributedratelimiting.git diff --git a/drl/peer_comm.c b/drl/peer_comm.c new file mode 100644 index 0000000..cb30657 --- /dev/null +++ b/drl/peer_comm.c @@ -0,0 +1,613 @@ +/* See the DRL-LICENSE file for this file's software license. */ + +#define _XOPEN_SOURCE 600 + +/* Debug output. */ +#include +#include + +/* Socket functions. */ +#include +#include + +/* Byte ordering and address structures. */ +#include + +/* memset() */ +#include + +/* close() & usleep */ +#include + +/* Mutex lock/unlock. */ +#include + +/* perror() */ +#include + +/* select() w/ timeout */ +#include +#include + +/* assert() */ +#include + +/* sigaddset(), sigemptyset(), SIGHUP, etc. */ +#include + +/* DRL data structures. */ +#include "raterouter.h" +#include "ratetypes.h" +#include "drl_state.h" +#include "peer_comm.h" +#include "logging.h" + +extern limiter_t limiter; + +static const uint32_t MAGIC_MSG = 0x123123; +static const uint32_t MAGIC_HELLO = 0x456456; +static const uint16_t MSG = 1; +static const uint16_t ACK = 2; + +static void message_to_hbo(message_t *msg) { + msg->magic = ntohl(msg->magic); + msg->ident_id = ntohl(msg->ident_id); + msg->seqno = ntohl(msg->seqno); + msg->min_seqno = ntohl(msg->min_seqno); + msg->type = ntohs(msg->type); + /* value is a double */ + /* weight is a double */ +} + +static void message_to_nbo(message_t *msg) { + msg->magic = htonl(msg->magic); + msg->ident_id = htonl(msg->ident_id); + msg->seqno = htonl(msg->seqno); + msg->min_seqno = htonl(msg->min_seqno); + msg->type = htons(msg->type); + /* value is a double */ + /* weight is a double */ +} + +static void hello_to_hbo(hello_t *hello) { + hello->magic = ntohl(hello->magic); + hello->ident_id = ntohl(hello->ident_id); + hello->port = ntohs(hello->port); +} + +static void hello_to_nbo(hello_t *hello) { + hello->magic = htonl(hello->magic); + hello->ident_id = htonl(hello->ident_id); + hello->port = htons(hello->port); +} + +static int is_connected(remote_limiter_t *remote) { + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0) + return 1; + else + return 0; +} + +static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) { + int result = 0; + message_t msg; + struct sockaddr_in toaddr; + + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + toaddr.sin_family = AF_INET; + + toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ + toaddr.sin_port = remote->port; + + memset(&msg, 0, sizeof(msg)); + msg.magic = MAGIC_MSG; + msg.ident_id = ident->id; + msg.type = ACK; + msg.seqno = seqno; + + message_to_nbo(&msg); + + if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_WARN, "send_ack: sento failed.\n"); + result = errno; + } + + return result; +} + +void limiter_receive() { + struct sockaddr_in fromaddr; + remote_node_t sender; + socklen_t fromlen = sizeof(fromaddr); + identity_t *ident = NULL; + remote_limiter_t *remote = NULL; + message_t msg; + + if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) { + /* recv failed. Log and continue. */ + printlog(LOG_WARN, "recv failed to read full message.\n"); + return; + } + memset(&sender, 0, sizeof(remote_node_t)); + sender.addr = fromaddr.sin_addr.s_addr; + sender.port = fromaddr.sin_port; + + message_to_hbo(&msg); + + assert(msg.magic == MAGIC_MSG); + +#if 0 + printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n", + msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t)); +#endif + pthread_testcancel(); + + pthread_rwlock_rdlock(&limiter.limiter_lock); + + ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id, + sizeof(msg.ident_id)); + + if (ident == NULL) { + printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n"); + pthread_rwlock_unlock(&limiter.limiter_lock); + return; + } + + pthread_mutex_lock(&ident->comm.lock); + + remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t)); + + if (remote == NULL) { + printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n"); + pthread_mutex_unlock(&ident->comm.lock); + pthread_rwlock_unlock(&limiter.limiter_lock); + return; + } + + switch (ident->comm.comm_fabric) { + case COMM_MESH: { + /* Use the message's value to be our new GRDrate/FPSweight for the + * message's sender. */ + remote->rate = msg.value; + + /* Reset the AWOL counter to zero since we received an update. */ + remote->awol = 0; + } + break; + + case COMM_GOSSIP: { + if (msg.type == ACK) { + if (msg.seqno == remote->outgoing.next_seqno - 1) { + int i; + + /* Ack for most recent message. Clear saved state. */ + remote->outgoing.first_seqno = remote->outgoing.next_seqno; + remote->outgoing.saved_value = 0; + remote->outgoing.saved_weight = 0; + + for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) { + if (ident->comm.retrys[i] >= 0 && + remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) { + //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]); + ident->comm.retrys[i] = -2; + } + } + } + /* Ignore ack if it isn't for most recent message. */ + } else { + if (msg.min_seqno > remote->incoming.seen_seqno) { + /* Entirely new information */ + remote->incoming.seen_seqno = msg.seqno; + remote->incoming.saved_value = msg.value; + remote->incoming.saved_weight = msg.weight; + ident->comm.gossip.value += msg.value; + ident->comm.gossip.weight += msg.weight; + send_ack(ident, remote, msg.seqno); + } else if (msg.seqno > remote->incoming.seen_seqno) { + /* Only some of the message is old news. */ + double diff_value = msg.value - remote->incoming.saved_value; + double diff_weight = msg.weight - remote->incoming.saved_weight; + + remote->incoming.seen_seqno = msg.seqno; + remote->incoming.saved_value = msg.value; + remote->incoming.saved_weight = msg.weight; + + ident->comm.gossip.value += diff_value; + ident->comm.gossip.weight += diff_weight; + send_ack(ident, remote, msg.seqno); + } else { + /* The entire message is old news. (Duplicate). */ + /* Do nothing. */ + } + } + } + break; + + default: { + printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n"); + } + } + + pthread_mutex_unlock(&ident->comm.lock); + pthread_rwlock_unlock(&limiter.limiter_lock); +} + +#if 0 +static void limiter_accept(comm_limiter_t *limiter) { + int sock, result; + struct sockaddr_in fromaddr; + socklen_t fromlen = sizeof(fromaddr); + remote_node_t sender; + remote_limiter_t *remote; + hello_t hello; + comm_ident_t *ident; + ident_handle *handle = NULL; + + sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen); + + assert(sock > 0); + + memset(&hello, 0, sizeof(hello_t)); + result = recv(sock, &hello, sizeof(hello_t), 0); + + if (result < 0) { + close(sock); + return; /* Failure - ignore it. */ + } + + assert(result == sizeof(hello_t)); + + hello_to_hbo(&hello); + + assert(hello.magic == MAGIC_HELLO); + + memset(&sender, 0, sizeof(remote_node_t)); + sender.addr = fromaddr.sin_addr.s_addr; + sender.port = ntohs(hello.port); + + pthread_testcancel(); + + pthread_rwlock_rdlock(&limiter->rwlock); + + handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id)); + + if (handle == NULL) { + printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n"); + pthread_rwlock_unlock(&limiter->rwlock); + return; + } + + ident = limiter->identities[*handle]; + assert(ident != NULL); + + pthread_mutex_lock(&ident->lock); + + remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t)); + + if (remote == NULL) { + printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n"); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); + close(sock); + return; + } + + if (is_connected(remote)) { + /* We are still connected, don't need the new socket. */ + close(sock); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); + return; + } + + /* We weren't connected, but we are now... */ + remote->socket = sock; + printf("Got connection on: %d\n", sock); + FD_SET(sock, &ident->fds); + + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); +} + +static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) { + int result; + message_t msg; + + memset(&msg, 0, sizeof(message_t)); + + result = recv(sock, &msg, sizeof(message_t), 0); + + if (result < 0) { + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + FD_CLR(sock, &ident->fds); + close(sock); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); + return; + } + + assert(result == sizeof(message_t)); + + message_to_hbo(&msg); + assert(msg.magic == MAGIC_MSG); + + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + + switch (ident->comm_fabric) { + case COMM_GOSSIP: { + ident->gossip.value += msg.value; + ident->gossip.weight += msg.weight; + } + break; + + default: { + assert(1 == 0); /* This case shouldn't happen. Punt for now... */ + } + } + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); +} + +static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) { + int select_result, i; + fd_set fds_copy; + struct timeval timeout; + + FD_ZERO(&fds_copy); + timeout.tv_sec = 15; + timeout.tv_usec = 0; + + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + memcpy(&fds_copy, &ident->fds, sizeof(fd_set)); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); + + /* mask interrupt signals for this thread? */ + + select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout); + + assert(select_result >= 0); + + if (select_result == 0) + return; /* Timed out */ + + for (i = 0; (i < FD_SETSIZE) && select_result; ++i) { + if (FD_ISSET(i, &fds_copy)) { + read_tcp_message(ident, limiter_rwlock, i); + select_result--; + } + } +} +#endif + +int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { + int result = 0; + remote_limiter_t *remote; + message_t msg; + struct sockaddr_in toaddr; + + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + toaddr.sin_family = AF_INET; + + memset(&msg, 0, sizeof(message_t)); + msg.magic = MAGIC_MSG; + msg.ident_id = id; + msg.value = comm->local_rate; + /* Do we want seqnos for mesh? We can get by without them. */ + + message_to_nbo(&msg); + + /* Iterate though and send update to all remote limiters in our identity. */ + map_reset_iterate(comm->remote_node_map); + while ((remote = map_next(comm->remote_node_map))) { + toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ + toaddr.sin_port = remote->port; + if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_CRITICAL, "ERR: limiter_send_mesh: sento failed.\n"); + result = errno; + printlog(LOG_CRITICAL, " - The error was |%d|\n", strerror(result)); + break; + } + } + + return result; +} + +int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { + int i, j, targetid; + int result = 0; + remote_limiter_t *remote; + struct sockaddr_in toaddr; + double msg_value, msg_weight; + + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + toaddr.sin_family = AF_INET; + + msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1); + msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1); + + for (i = 0; i < comm->gossip.gossip_branch; ++i) { + message_t msg; + + if (comm->retrys[i] >= 0) { + remote = &comm->remote_limiters[comm->retrys[i]]; + targetid = comm->retrys[i]; + //printf("%d:d:%d, ", i, comm->retrys[i]); + } else { + targetid = -2; + + while (targetid == -2) { + targetid = myrand() % comm->remote_node_count; + + for (j = 0; j < comm->gossip.gossip_branch; ++j) { + if (targetid == comm->retrys[j]) { + targetid = -2; + break; + } + } + } + + remote = &comm->remote_limiters[targetid]; + //printf("%d:r:%d, ", i, targetid); + } + + toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ + toaddr.sin_port = remote->port; + + memset(&msg, 0, sizeof(message_t)); + msg.magic = MAGIC_MSG; + msg.ident_id = id; + msg.value = msg_value + remote->outgoing.saved_value; + msg.weight = msg_weight + remote->outgoing.saved_weight; + msg.seqno = remote->outgoing.next_seqno; + msg.min_seqno = remote->outgoing.first_seqno; + msg.type = MSG; + + remote->outgoing.next_seqno++; + remote->outgoing.saved_value += msg_value; + remote->outgoing.saved_weight += msg_weight; + + message_to_nbo(&msg); + + if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_CRITICAL, "ERR: limiter_send_gossip: sento failed.\n"); + result = errno; + break; + } + + comm->retrys[i] = targetid; + } + //printf("\n"); + + comm->gossip.value = msg_value; + comm->gossip.weight = msg_weight; + + return result; +} + +#if 0 +int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) { + int i, targetid, sock; + int result = 0; + message_t msg; + + memset(&msg, 0, sizeof(message_t)); + msg.magic = MAGIC_MSG; + msg.ident_id = ident->ident_id; + msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1); + msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1); + + message_to_nbo(&msg); + + for (i = 0; i < ident->gossip.gossip_branch; ++i) { + targetid = myrand() % ident->remote_node_count; + sock = ident->remote_limiters[targetid].socket; + + result = send(sock, &msg, sizeof(message_t), 0); + if (result < 0) { + result = errno; + FD_CLR(sock, &ident->fds); + close(sock); + break; + } + + assert(result == sizeof(message_t)); + } + + ident->gossip.value /= (ident->gossip.gossip_branch + 1); + ident->gossip.weight /= (ident->gossip.gossip_branch + 1); + + return result; +} +#endif + +#if 0 +void *limiter_accept_thread(void *limiter) { + sigset_t signal_mask; + + sigemptyset(&signal_mask); + sigaddset(&signal_mask, SIGHUP); + pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); + + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + while (1) { + limiter_accept((comm_limiter_t *) limiter); + } + pthread_exit(NULL); +} +void *ident_receive_thread(void *recv_args) { + int i, sock, result; + struct recv_thread_args *args = (struct recv_thread_args *) recv_args; + comm_ident_t *ident = args->ident; + pthread_rwlock_t *lock = args->lock; + uint16_t port = args->port; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + hello_t hello; + + free(args); + + while (1) { + memset(&hello, 0, sizeof(hello_t)); + + /*Try to connect to all remote nodes if they aren't already connected.*/ + pthread_rwlock_rdlock(lock); + pthread_mutex_lock(&ident->lock); + + hello.magic = MAGIC_HELLO; + hello.ident_id = ident->ident_id; + hello.port = ntohs(port); + + hello_to_nbo(&hello); + + for (i = 0; i < ident->remote_node_count; ++i) { + if (is_connected(&ident->remote_limiters[i])) + continue; /* Ignore if it's already connected */ + + sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sock < 0) { + perror("socket"); + continue; + } + + assert(sock >= 0); + + memset(&addr, 0, sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + addr.sin_port = ident->remote_limiters[i].port; + addr.sin_addr.s_addr = ident->remote_limiters[i].addr; + + result = connect(sock, (struct sockaddr *) &addr, addrlen); + if (result < 0) { + close(sock); + continue; + } + + result = send(sock, &hello, sizeof(hello_t), 0); + if (result < 0) { + close(sock); + continue; + } + + assert(result == sizeof(hello_t)); + + ident->remote_limiters[i].socket = sock; + printf("Connected on socket: %d\n", sock); + FD_SET(sock, &ident->fds); + } + + pthread_rwlock_unlock(lock); + pthread_mutex_unlock(&ident->lock); + + ident_receive(ident, lock); + } + pthread_exit(NULL); +} +#endif