/* See the DRL-LICENSE file for this file's software license. */ #include /* Debug output. */ #include #include /* Socket functions. */ #include #include /* Byte ordering and address structures. */ #include /* memset() */ #include #include "raterouter.h" #include "ratetypes.h" #include "drl_state.h" #include "peer_comm.h" #include "swim.h" #include "logging.h" /* From ulogd_DRL.c */ extern int do_partition; extern int partition_set; extern limiter_t limiter; /**Finds the update, if found then frees the memory of the new_update * and returns 1. If find fails then this returns 0*/ static int find_and_update(update_t *updates, update_t *new_update) { if( updates == NULL ) { printlog(LOG_DEBUG, "SWIM: INFECT: no existing updates\n"); return 0; } update_t *pointer = updates; while(pointer != NULL) { if(pointer->remote == new_update->remote && pointer->remote->incarnation >= new_update->remote->incarnation) { pointer->count = 0; printlog(LOG_DEBUG, "SWIM: INFECT: update already exists\n"); free(new_update); return 1; } pointer = pointer->next; } printlog(LOG_DEBUG, "SWIM: INFECT: update not found among existing updates\n"); return 0; } /*Just adds to the end of list and returns the head*/ update_t *add_to_list(update_t *updates, update_t *new_update) { printlog(LOG_DEBUG, "SWIM: INFECT: adding to list of updates: %s is %d\n", inet_ntoa(*(struct in_addr *)&new_update->remote->addr), new_update->remote->reachability); update_t *head = updates; update_t *pointer; if (head == NULL) { head = new_update; } else { pointer = head; while(pointer->next != NULL) { pointer = pointer->next; } pointer->next = new_update; } return head; } /** Given the address of the suspected node this function * identifies friends who can probe the suspected node * After recording these, the messages for help are sent * in the next gossip round*/ static int help_from_friends(comm_t *comm, int suspect_index, uint32_t id, int sock) { printlog(LOG_DEBUG,"SWIM: In function help_from_friends suspected node: %s, index: %d\n",inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index); int i=0, j = 0; int result; int count_friends = (comm->remote_node_count > MAX_FRIENDS ) ? MAX_FRIENDS : comm->remote_node_count; // A more logical way? // remote_node_t friend_nodes[count_friends]; // int friend_ids[count_friends]; while( (i - j) < count_friends && i < comm->remote_node_count ) { // Do not pick a friend who is suspected to be down if (comm->remote_limiters[i].reachability != REACHABLE) { j++; i++; continue; } /**construct the message and send it to friend * pick up friend i*/ message_t check_msg; memset(&check_msg, 0, sizeof(message_t)); check_msg.magic = MAGIC_MSG; check_msg.ident_id = id; check_msg.value = 0; check_msg.weight = 0; check_msg.seqno = 0; check_msg.min_seqno = 0; check_msg.type = CHECK; check_msg.check_target = comm->remote_limiters[suspect_index].addr; check_msg.check_port = comm->remote_limiters[suspect_index].port; // send the message struct sockaddr_in toaddr; memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; toaddr.sin_addr.s_addr = comm->remote_limiters[i].addr; toaddr.sin_port = comm->remote_limiters[i].port; message_to_nbo(&check_msg); printlog(LOG_DEBUG,"SWIM: Sending CHECK message to friend %s i: %d", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr), i); printlog(LOG_DEBUG," Suspect: %s", inet_ntoa(*(struct in_addr *)&check_msg.check_target)); printlog(LOG_DEBUG," Initial: %s suspect_index: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index); if (sendto(sock, &check_msg, sizeof(check_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); result = errno; printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); } i++; } printlog(LOG_DEBUG,"SWIM: Out function help_from_friends.\n"); return 0; } /** Receiving CHECK packet*/ static void swim_receive_check(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) { #ifdef ALLOW_PARTITION int id; for(id = 0; id < comm->remote_node_count; id++) { if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) { if (do_partition && ((partition_set & (1 << id)) == 0)) { printlog(LOG_DEBUG, "SWIM: Ignoring CHECK message from %d\n", id); return; } } } #endif //FIX if(remote->reachability != REACHABLE) return; swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; // create the message that has to be sent to the suspected node printlog(LOG_DEBUG,"SWIM: received CHECK message from %s", inet_ntoa(*(struct in_addr *)&remote->addr)); printlog(LOG_DEBUG,", sending PING to %s\n", inet_ntoa(*(struct in_addr *)&msg->check_target)); int result; message_t ping_msg; memset(&ping_msg, 0, sizeof(message_t)); ping_msg.magic = MAGIC_MSG; ping_msg.ident_id = msg->ident_id; ping_msg.value = 0; ping_msg.weight = 0; ping_msg.seqno = 0; ping_msg.min_seqno = 0; ping_msg.type = PING; ping_msg.ping_source = remote->addr; ping_msg.ping_port = remote->port; // send the ping message struct sockaddr_in toaddr; memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; toaddr.sin_addr.s_addr = msg->check_target; toaddr.sin_port = msg->check_port; message_to_nbo(&ping_msg); /** add to ping targets before sending message */ ping_target_t *suspect = (ping_target_t*) malloc(sizeof(ping_target_t)); memset(suspect, 0, sizeof(ping_target_t)); suspect->target.addr = msg->check_target; suspect->target.port = msg->check_port; suspect->source.addr = remote->addr; suspect->source.port = remote->port; printlog(LOG_DEBUG, "SWIM: adding %s to PING list\n", inet_ntoa(*(struct in_addr *)&suspect->target.addr)); ping_target_t *pointer = swim_comm->ping_targets; if( swim_comm->ping_targets != NULL ) { while(pointer->next != NULL) { pointer = pointer->next; } pointer->next = suspect; } else { swim_comm->ping_targets = suspect; } /** added to the end of the list of ping_targets */ if (sendto(limiter.udp_socket, &ping_msg, sizeof(ping_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); result = errno; printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); } else { printlog(LOG_DEBUG,"SWIM: Sent PING message\n"); } printlog(LOG_DEBUG,"SWIM: Processed CHECK packet\n"); return; } /** Receiving PING packet*/ static void swim_receive_ping(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) { #ifdef ALLOW_PARTITION int id; for(id = 0; id < comm->remote_node_count; id++) { if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) { if (do_partition && ((partition_set & (1 << id)) == 0)) { printlog(LOG_DEBUG, "SWIM: Ignoring PING message from %d\n", id); return; } } } #endif printlog(LOG_DEBUG,"SWIM: receiving the PING message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr)); //FIX if(remote->reachability != REACHABLE) return; int result; swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; message_t pingack_msg; memset(&pingack_msg, 0, sizeof(message_t)); pingack_msg.magic = MAGIC_MSG; pingack_msg.ident_id = msg->ident_id; pingack_msg.value = 0; pingack_msg.weight = 0; pingack_msg.seqno = 0; pingack_msg.min_seqno = 0; pingack_msg.type = PING_ACK; swim_comm->incarnation++; pingack_msg.update_present = TRUE; pingack_msg.reachability = REACHABLE; pingack_msg.incarnation = swim_comm->incarnation; FILE *fp = fopen("/root/incarnation", "w+"); fprintf(fp, "%d", swim_comm->incarnation + 1); fflush(fp); fclose(fp); // send PING_ACK struct sockaddr_in toaddr; memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; toaddr.sin_addr.s_addr = remote->addr; toaddr.sin_port = remote->port; message_to_nbo(&pingack_msg); if (sendto(limiter.udp_socket, &pingack_msg, sizeof(pingack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); result = errno; printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); } else { printlog(LOG_DEBUG, "SWIM: sent PING_ACK to %s\n", inet_ntoa(*(struct in_addr *)&remote->addr)); } printlog(LOG_DEBUG,"SWIM: Processed PING packet\n"); return; } /** Receiving PING_ACK packet*/ static void swim_receive_pingack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) { // find the source which requested this ping and inform it with CHECK_ACK, ALIVE // look up in the ping_targets list. printlog(LOG_DEBUG, "SWIM: receiving the PING_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr)); flushlog(); int result, confirm; int delete_head = 0; ping_target_t* pointer; ping_target_t* prev_pointer; swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; pointer = swim_comm->ping_targets; prev_pointer = swim_comm->ping_targets; /* * Removed this because if a PING_ACK arrives then CHECK_ACK would be * sent to all the nodes which requested a check on this node. Hence pointer * could be NULL but we could receive a CHECK_ACK packet * if (pointer == NULL) { printlog(LOG_DEBUG, "SWIM: Received PING_ACK for a PING not sent\n"); return; } */ while(pointer!=NULL) { if(pointer->target.addr == remote->addr && pointer->target.port == remote->port) { // suspect has been found in the list // now construct the CHECK_ACK message and send it to source message_t checkack_msg; memset(&checkack_msg, 0, sizeof(message_t)); checkack_msg.magic = MAGIC_MSG; checkack_msg.ident_id = msg->ident_id; checkack_msg.value = 0; checkack_msg.weight = 0; checkack_msg.seqno = 0; checkack_msg.min_seqno = 0; checkack_msg.type = CHECK_ACK; checkack_msg.checkack_value = ALIVE; // inform the source of the addr and port of suspected node checkack_msg.check_target = remote->addr; checkack_msg.check_port = remote->port; struct sockaddr_in toaddr; memset(&toaddr, 0, sizeof(struct sockaddr_in)); // found source toaddr.sin_family = AF_INET; toaddr.sin_addr.s_addr = pointer->source.addr; toaddr.sin_port = pointer->source.port; message_to_nbo(&checkack_msg); if (sendto(limiter.udp_socket, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: swim_receive_pingack : sento failed.\n"); result = errno; printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); } /** Now delete this suspect from friends list of nodes*/ if(prev_pointer == pointer) { swim_comm->ping_targets = pointer->next; pointer->next = NULL; free(pointer); pointer = swim_comm->ping_targets; delete_head = 1; } else { prev_pointer->next = pointer->next; pointer->next = NULL; free(pointer); pointer = prev_pointer; } confirm = 1; } prev_pointer = pointer; if(pointer != NULL && delete_head != 1) { pointer = pointer->next; } delete_head = 0; printf("SWIM: PING ACK\n"); } // PING_ACK has been received then add to the list of updates remote_node_t sender; memset(&sender, 0, sizeof(remote_node_t)); sender.addr = remote->addr; sender.port = remote->port; update_t* new_update = (update_t *) malloc(sizeof(update_t)); memset(new_update, 0, sizeof(update_t)); new_update->remote = map_search(comm->remote_node_map, &sender, sizeof(remote_node_t)); if(new_update->remote == NULL) { printlog(LOG_DEBUG, "SWIM: PANIC: PING_ACK received from an unknown node %s\n",inet_ntoa(*(struct in_addr *)&sender.addr)); } new_update->count = 0; if(msg->incarnation > new_update->remote->incarnation) { new_update->remote->incarnation = msg->incarnation; new_update->remote->reachability = REACHABLE; new_update->remote->awol = 0; new_update->remote->count_rounds = 0; new_update->remote->count_awol = 0; new_update->remote->count_alive = 0; if( find_and_update(swim_comm->updates, new_update) == 0 ) { swim_comm->updates = add_to_list(swim_comm->updates, new_update); swim_comm->count_updates++; } } else if(msg->incarnation == new_update->remote->incarnation) { // if the node previously thought that sender was down then it prevails // else if the node thought it was up then there is no change } if(confirm != 1) printlog(LOG_DEBUG,"SWIM: PING_ACK did not match entries in list\n"); printlog(LOG_DEBUG,"SWIM: Processed PING_ACK packet\n"); return; } /** Receiving CHECK_ACK packet*/ static void swim_receive_checkack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) { printlog(LOG_DEBUG, "SWIM: receiving the CHECK_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr)); int i; for( i = 0; i < comm->remote_node_count; i++) { if(comm->remote_limiters[i].addr == msg->check_target && comm->remote_limiters[i].port == msg->check_port) { if(msg->checkack_value == ALIVE) comm->remote_limiters[i].count_alive++; else if (msg->checkack_value == AWOL) comm->remote_limiters[i].count_awol++; } } printlog(LOG_DEBUG,"SWIM: Processed CHECK_ACK packet\n"); return; } static int swim_send(comm_t *comm, int id, int sock) { int i, result; swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; /**SOURCE: Send messages to friends to check on * nodes which are suspected to be down*/ for(i = 0; i < comm->remote_node_count; i++) { printlog(LOG_DEBUG, "SWIM: AWOL count of %s is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].awol); if(comm->remote_limiters[i].awol == GOSSIP_REMOTE_AWOL_THRESHOLD) { // HACK to make sure this part of code is entered only once comm->remote_limiters[i].reachability = SUSPECT; comm->remote_limiters[i].awol++; help_from_friends(comm, i, id, sock); } } /**SOURCE: Count number of rounds since the node has been suspected * If in this process the count reaches threshold then take action */ for (i = 0; i < comm->remote_node_count; i++) { if(comm->remote_limiters[i].reachability == SUSPECT) { comm->remote_limiters[i].count_rounds++; printlog(LOG_DEBUG, "SWIM: ROUNDS count on %s index %d is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), i, comm->remote_limiters[i].count_rounds); if(comm->remote_limiters[i].count_rounds > SOURCE_THRESHOLD) { if(comm->remote_limiters[i].count_alive > 0) { printlog(LOG_DEBUG,"SWIM: the node %s was up, wrongly suspected\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr)); comm->remote_limiters[i].reachability = REACHABLE; comm->remote_limiters[i].count_rounds = 0; comm->remote_limiters[i].count_awol = 0; comm->remote_limiters[i].count_alive = 0; // FIX comm->remote_limiters[i].awol = 0; } else if (comm->remote_limiters[i].count_awol > 0) { comm->remote_limiters[i].reachability = UNREACHABLE; update_t* new_update = (update_t *) malloc(sizeof(update_t)); memset(new_update, 0, sizeof(update_t)); new_update->remote = &comm->remote_limiters[i]; new_update->count = 0; // comm->remote_limiters[i].incoming.seen_seqno = 0; printlog(LOG_DEBUG, "SWIM: INFECT: Update down information %s\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr)); if(find_and_update(swim_comm->updates, new_update) == 0) { swim_comm->updates = add_to_list(swim_comm->updates, new_update); swim_comm->count_updates++; } printlog(LOG_DEBUG,"SWIM: The node %s is down. reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);// The node is check_list.target } else { /**Even friends have not responded, request for help from more friends?*/ printlog(LOG_DEBUG,"SWIM: Last ditch effort, even friends did not respond\n"); comm->remote_limiters[i].reachability = SUSPECT; comm->remote_limiters[i].count_rounds = 0; // CHECK help_from_friends(comm, i, id, sock); } } } } /**Actions performed by "FRIEND"*/ // DELETE THIS LOOP ping_target_t* ping_list = swim_comm->ping_targets; while(ping_list != NULL) { printlog(LOG_DEBUG, "SWIM: in PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr)); ping_list = ping_list->next; } ping_list = swim_comm->ping_targets; ping_target_t* ping_list_prev = swim_comm->ping_targets; int delete_head = 0; while(ping_list != NULL) { ping_list->count++; printlog(LOG_DEBUG,"SWIM: friend keeping track of gossip rounds since PING.\n"); // if in this process some node hits threshold then // send AWOL and delete it from this list if(ping_list->count >= FRIEND_THRESHOLD) { printlog(LOG_DEBUG,"SWIM: friend declaring AWOL.\n"); message_t checkack_msg; memset(&checkack_msg, 0, sizeof(message_t)); checkack_msg.magic = MAGIC_MSG; checkack_msg.ident_id = id; checkack_msg.value = 0; checkack_msg.weight = 0; checkack_msg.seqno = 0; checkack_msg.min_seqno = 0; checkack_msg.type = CHECK_ACK; checkack_msg.checkack_value = AWOL; // inform the source of the addr and port of suspected node checkack_msg.check_target = ping_list->target.addr; checkack_msg.check_port = ping_list->target.port; struct sockaddr_in toaddr; memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; toaddr.sin_addr.s_addr = ping_list->source.addr; toaddr.sin_port = ping_list->source.port; message_to_nbo(&checkack_msg); if (sendto(sock, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); result = errno; printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); } // now the deletion: after deletion we want to continue from next node printlog(LOG_DEBUG, "SWIM: deleting from PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr)); if(ping_list_prev == ping_list) { swim_comm->ping_targets = ping_list->next; ping_list->next = NULL; free(ping_list); ping_list = swim_comm->ping_targets; delete_head = 1; } else { ping_list_prev->next = ping_list->next; ping_list->next = NULL; free(ping_list); ping_list = ping_list_prev; } } ping_list_prev = ping_list; if (ping_list != NULL && delete_head != 1) ping_list = ping_list->next; delete_head = 0; } return 0; } /** Handle SWIM packets received*/ int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) { swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; if (msg->type == ACK) { /* If ACK was received then reset the awol count */ if (msg->seqno == remote->outgoing.next_seqno - 1) { /* Ack for most recent message. Clear saved state. */ remote->outgoing.first_seqno = remote->outgoing.next_seqno; remote->outgoing.saved_value = 0; remote->outgoing.saved_weight = 0; remote->awol = 0; remote->count_awol = 0; } /* Ignore ack if it isn't for most recent message. */ } else if (msg->type == MSG) { if (msg->min_seqno > remote->incoming.seen_seqno) { /* Entirely new information */ remote->incoming.seen_seqno = msg->seqno; remote->incoming.saved_value = msg->value; remote->incoming.saved_weight = msg->weight; comm->gossip.value += msg->value; comm->gossip.weight += msg->weight; send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); remote->awol = 0; remote->count_rounds = 0; remote->count_awol = 0; remote->count_alive = 0; // check if there is an update piggy backed on this message // if yes then add it to the update list if(msg->update_present > 0) { update_t *new_update = (update_t *) malloc(sizeof(update_t)); memset(new_update, 0, sizeof(update_t)); // look for the remote limiter about whom update is being // sent the update node is sent in the message, msg->node! remote_limiter_t *temp_remote = map_search(comm->remote_node_map, &msg->node, sizeof(remote_node_t)); // an update about the node itself is possible in which case map_search would fail if(temp_remote != NULL) { printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update says %d\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr), msg->reachability); new_update->remote = temp_remote; new_update->count = 0; if(msg->incarnation > new_update->remote->incarnation) { printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update is about new incarnation\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr)); new_update->remote->reachability = msg->reachability; new_update->remote->incarnation = msg->incarnation; if(find_and_update(swim_comm->updates, new_update) == 0) { swim_comm->updates = add_to_list(swim_comm->updates, new_update); swim_comm->count_updates++; } } else if(msg->incarnation == new_update->remote->incarnation && new_update->remote->reachability == REACHABLE && msg->reachability == UNREACHABLE) { printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update about same incarnation, says node unreachable\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr)); new_update->remote->reachability = msg->reachability; if(find_and_update(swim_comm->updates, new_update) == 0) { swim_comm->updates = add_to_list(swim_comm->updates, new_update); swim_comm->count_updates++; } else { // Ignore the update printlog(LOG_DEBUG, "SWIM: INFECT: update about %s ignored\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr)); } } } } } else if (msg->seqno > remote->incoming.seen_seqno) { /* Only some of the message is old news. */ double diff_value = msg->value - remote->incoming.saved_value; double diff_weight = msg->weight - remote->incoming.saved_weight; remote->incoming.seen_seqno = msg->seqno; remote->incoming.saved_value = msg->value; remote->incoming.saved_weight = msg->weight; comm->gossip.value += diff_value; comm->gossip.weight += diff_weight; send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); remote->awol = 0; remote->count_awol = 0; } else { /* The entire message is old news. (Duplicate). */ /* Do nothing. */ } // Hearing from a node previously declared unreachable if(remote->reachability == UNREACHABLE) { printlog(LOG_DEBUG, "SWIM: INFECT: seems like %s is back up\n", inet_ntoa(*(struct in_addr *)&remote->addr)); remote->reachability = SUSPECT; remote->awol = GOSSIP_REMOTE_AWOL_THRESHOLD; remote->count_rounds = 0; remote->count_awol = 0; remote->count_alive = 0; } } else if(msg->type == CHECK) { swim_receive_check(comm, sock, remote, msg); } else if(msg->type == PING ) { swim_receive_ping(comm, sock, remote, msg); } else if(msg->type == PING_ACK) { swim_receive_pingack(comm, sock, remote, msg); } else if(msg->type == CHECK_ACK) { swim_receive_checkack(comm, sock, remote, msg); } return 0; } int send_gossip_swim(comm_t *comm, uint32_t id, int sock) { int i, j; int retry_index = 0; int result = 0; remote_limiter_t *remote; struct sockaddr_in toaddr; double msg_value, msg_weight; /* This is the factor for the portion of value/weight to keep locally. * Normally this is 1, meaning that we retain the same amount of value/weight * that was sent to the peers. In the case of not being able to send to a * peer though, we increment this to reclaim the value/weight locally. */ int message_portion = 1; memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1); msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1); /*Nodes to which message was sent will have a non-zero here*/ /* for (i = 0; i < comm->remote_node_count; i++) { if(comm->remote_limiters[i].awol > 0) comm->remote_limiters[i].awol++; }*/ for (i = 0; i < comm->remote_node_count; i++) { printlog(LOG_DEBUG, "SWIM: STATUS: Node: %s reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability); } for (i = 0; i < comm->gossip.gossip_branch; ++i) { int targetid = NULL_PEER; int rand_count = 0; message_t msg; printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch); /* If there are any peers with unacked messages, select them first. */ while (retry_index < comm->remote_node_count) { if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) { targetid = retry_index; printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid); retry_index += 1; break; } retry_index += 1; } while (targetid == NULL_PEER && rand_count < 50) { /* *Gossip node would be chosen from * the array which would be randomly shuffled * once all the nodes have been sent messages* */ targetid = find_gossip_target(comm); /* If we didn't find any peers the needed retransmissions, select one randomly here. */ /* targetid = myrand() % comm->remote_node_count; rand_count += 1; */ /* Don't select an already-used index. */ for (j = 0; j < i; ++j) { if (targetid == comm->selected[j]) { printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]); targetid = NULL_PEER; break; } } if (targetid != NULL_PEER) { if(comm->remote_limiters[targetid].reachability != REACHABLE) { printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. reachability is %d, and remote awol count is %d\n", targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].awol); targetid = NULL_PEER; } } rand_count++; } if (targetid < 0) { /* Couldn't find a suitable peer to send to... */ message_portion += 1; printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n"); continue; } else { printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid); } remote = &comm->remote_limiters[targetid]; comm->selected[i] = targetid; toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; memset(&msg, 0, sizeof(message_t)); msg.magic = MAGIC_MSG; msg.ident_id = id; msg.value = msg_value + remote->outgoing.saved_value; msg.weight = msg_weight + remote->outgoing.saved_weight; msg.seqno = remote->outgoing.next_seqno; msg.min_seqno = remote->outgoing.first_seqno; msg.type = MSG; msg.view = comm->gossip.view; if(comm->gossip.membership == SWIM) { swim_comm_t *swim_comm = (swim_comm_t *)comm->membership_state; // piggy back an update if(swim_comm->count_updates > 0) { int index = myrand() % swim_comm->count_updates; update_t *pointer = swim_comm->updates; update_t *prev_pointer = swim_comm->updates; while(index != 0) { prev_pointer = pointer; pointer = pointer->next; index--; } msg.update_present = TRUE; msg.reachability = pointer->remote->reachability; msg.incarnation = pointer->remote->incarnation; msg.node.addr = pointer->remote->addr; msg.node.port = pointer->remote->port; printlog(LOG_DEBUG, "SWIM: Sending update about %s\n", inet_ntoa(*(struct in_addr *)&pointer->remote->addr)); pointer->count++; if(swim_comm->updates != pointer && pointer->count == UPDATE_THRESHOLD) { // pointer is not head prev_pointer->next = pointer->next; pointer->next = NULL; free(pointer); swim_comm->count_updates--; } else if(pointer->count == UPDATE_THRESHOLD) { // pointer is head of the list swim_comm->updates = pointer->next; pointer->next = NULL; free(pointer); swim_comm->count_updates--; if(swim_comm->count_updates == 0) { swim_comm->updates = NULL; } } } } remote->outgoing.next_seqno++; remote->outgoing.saved_value += msg_value; remote->outgoing.saved_weight += msg_weight; /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */ remote->awol += 1; #ifdef ALLOW_PARTITION if (do_partition && ((partition_set & (1 << targetid)) == 0)) { printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid); continue; } #endif message_to_nbo(&msg); if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n"); result = errno; break; } printlog(LOG_DEBUG,"SWIM: sent the gossip to %s\n", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr)); } swim_send(comm, id, sock); comm->gossip.value = msg_value * message_portion; comm->gossip.weight = msg_weight * message_portion; return result; } void swim_restart(comm_t *comm, int32_t view_number) { /* Not sure about this yet... */ } int swim_init(comm_t *comm, uint32_t id) { comm->membership_state = malloc(sizeof(swim_comm_t)); if (comm->membership_state == NULL) { return ENOMEM; } comm->connected = 1; comm->recv_function = swim_receive; comm->send_function = send_gossip_swim; comm->restart_function = swim_restart; swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; FILE *fp = fopen("/root/incarnation", "w+"); fscanf(fp, "%d", &swim_comm->incarnation); fprintf(fp, "%d", swim_comm->incarnation + 1); // next time a greater incarnation would be read fflush(fp); fclose(fp); return 0; } void swim_teardown(comm_t *comm) { if (comm->membership_state) free(comm->membership_state); } /**Bhanu: new functions being introduced*/ /** Given a friend_id and the address of the suspected node * this function sends a CHECK message to friend */ /* int help_from_friend(comm_t* comm, int friend_id, in_addr_t addr, in_port_t port, uint32_t id, int sock) { // send a CHECK message to friend int result; message_t msg; memset(&msg, 0, sizeof(message_t)); msg.magic = MAGIC_MSG; msg.ident_id = id; msg.value = 0; msg.weight = 0; msg.seqno = 0; msg.min_seqno = 0; msg.type = CHECK; msg.check_target = addr; msg.check_port = port; // send the message struct sockaddr_in toaddr; memset(&toaddr, 0, sizeof(sockaddr_in)); toaddr.sin_family = AF_INET; toaddr.sin_addr.s_addr = comm->remote_limiters[friend_id].addr; toaddr.sin_port = comm->remote_limiters[friend_id].port; message_to_nbo(&msg); if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); result = errno; printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); } } */