--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+#include <errno.h>
+
+/* Debug output. */
+#include <stdio.h>
+#include <stdlib.h>
+
+/* Socket functions. */
+#include <sys/types.h>
+#include <sys/socket.h>
+
+/* Byte ordering and address structures. */
+#include <arpa/inet.h>
+
+/* memset() */
+#include <string.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "swim.h"
+#include "logging.h"
+
+/* From ulogd_DRL.c */
+extern int do_partition;
+extern int partition_set;
+
+extern limiter_t limiter;
+
+/**Finds the update, if found then frees the memory of the new_update
+ * and returns 1. If find fails then this returns 0*/
+static int find_and_update(update_t *updates, update_t *new_update) {
+ if( updates == NULL ) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: no existing updates\n");
+ return 0;
+ }
+ update_t *pointer = updates;
+ while(pointer != NULL) {
+ if(pointer->remote == new_update->remote && pointer->remote->incarnation >= new_update->remote->incarnation) {
+ pointer->count = 0;
+ printlog(LOG_DEBUG, "SWIM: INFECT: update already exists\n");
+ free(new_update);
+ return 1;
+ }
+ pointer = pointer->next;
+ }
+ printlog(LOG_DEBUG, "SWIM: INFECT: update not found among existing updates\n");
+ return 0;
+}
+
+/*Just adds to the end of list and returns the head*/
+update_t *add_to_list(update_t *updates, update_t *new_update) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: adding to list of updates: %s is %d\n", inet_ntoa(*(struct in_addr *)&new_update->remote->addr), new_update->remote->reachability);
+ update_t *head = updates;
+ update_t *pointer;
+ if (head == NULL) {
+ head = new_update;
+ } else {
+ pointer = head;
+ while(pointer->next != NULL) {
+ pointer = pointer->next;
+ }
+ pointer->next = new_update;
+ }
+ return head;
+}
+
+/** Given the address of the suspected node this function
+ * identifies friends who can probe the suspected node
+ * After recording these, the messages for help are sent
+ * in the next gossip round*/
+static int help_from_friends(comm_t *comm, int suspect_index, uint32_t id, int sock) {
+ printlog(LOG_DEBUG,"SWIM: In function help_from_friends suspected node: %s, index: %d\n",inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
+ int i=0, j = 0;
+ int result;
+ int count_friends = (comm->remote_node_count > MAX_FRIENDS ) ? MAX_FRIENDS : comm->remote_node_count; // A more logical way?
+ // remote_node_t friend_nodes[count_friends];
+ // int friend_ids[count_friends];
+
+ while( (i - j) < count_friends && i < comm->remote_node_count ) {
+ // Do not pick a friend who is suspected to be down
+ if (comm->remote_limiters[i].reachability != REACHABLE) {
+ j++; i++;
+ continue;
+ }
+ /**construct the message and send it to friend
+ * pick up friend i*/
+ message_t check_msg;
+ memset(&check_msg, 0, sizeof(message_t));
+ check_msg.magic = MAGIC_MSG;
+ check_msg.ident_id = id;
+ check_msg.value = 0;
+ check_msg.weight = 0;
+ check_msg.seqno = 0;
+ check_msg.min_seqno = 0;
+ check_msg.type = CHECK;
+ check_msg.check_target = comm->remote_limiters[suspect_index].addr;
+ check_msg.check_port = comm->remote_limiters[suspect_index].port;
+
+ // send the message
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = comm->remote_limiters[i].addr;
+ toaddr.sin_port = comm->remote_limiters[i].port;
+ message_to_nbo(&check_msg);
+
+ printlog(LOG_DEBUG,"SWIM: Sending CHECK message to friend %s i: %d", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr), i);
+ printlog(LOG_DEBUG," Suspect: %s", inet_ntoa(*(struct in_addr *)&check_msg.check_target));
+ printlog(LOG_DEBUG," Initial: %s suspect_index: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
+ if (sendto(sock, &check_msg, sizeof(check_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ }
+ i++;
+ }
+ printlog(LOG_DEBUG,"SWIM: Out function help_from_friends.\n");
+ return 0;
+}
+
+/** Receiving CHECK packet*/
+static void swim_receive_check(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+
+#ifdef ALLOW_PARTITION
+ int id;
+ for(id = 0; id < comm->remote_node_count; id++) {
+ if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
+ if (do_partition && ((partition_set & (1 << id)) == 0)) {
+ printlog(LOG_DEBUG, "SWIM: Ignoring CHECK message from %d\n", id);
+ return;
+ }
+ }
+ }
+#endif
+
+//FIX
+ if(remote->reachability != REACHABLE)
+ return;
+
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ // create the message that has to be sent to the suspected node
+ printlog(LOG_DEBUG,"SWIM: received CHECK message from %s", inet_ntoa(*(struct in_addr *)&remote->addr));
+ printlog(LOG_DEBUG,", sending PING to %s\n", inet_ntoa(*(struct in_addr *)&msg->check_target));
+ int result;
+ message_t ping_msg;
+ memset(&ping_msg, 0, sizeof(message_t));
+ ping_msg.magic = MAGIC_MSG;
+ ping_msg.ident_id = msg->ident_id;
+ ping_msg.value = 0;
+ ping_msg.weight = 0;
+ ping_msg.seqno = 0;
+ ping_msg.min_seqno = 0;
+ ping_msg.type = PING;
+ ping_msg.ping_source = remote->addr;
+ ping_msg.ping_port = remote->port;
+ // send the ping message
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = msg->check_target;
+ toaddr.sin_port = msg->check_port;
+ message_to_nbo(&ping_msg);
+
+ /** add to ping targets before sending message */
+ ping_target_t *suspect = (ping_target_t*) malloc(sizeof(ping_target_t));
+ memset(suspect, 0, sizeof(ping_target_t));
+ suspect->target.addr = msg->check_target;
+ suspect->target.port = msg->check_port;
+ suspect->source.addr = remote->addr;
+ suspect->source.port = remote->port;
+ printlog(LOG_DEBUG, "SWIM: adding %s to PING list\n", inet_ntoa(*(struct in_addr *)&suspect->target.addr));
+
+ ping_target_t *pointer = swim_comm->ping_targets;
+ if( swim_comm->ping_targets != NULL ) {
+ while(pointer->next != NULL) {
+ pointer = pointer->next;
+ }
+ pointer->next = suspect;
+ }
+ else {
+ swim_comm->ping_targets = suspect;
+ }
+ /** added to the end of the list of ping_targets */
+
+ if (sendto(limiter.udp_socket, &ping_msg, sizeof(ping_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ } else {
+ printlog(LOG_DEBUG,"SWIM: Sent PING message\n");
+ }
+ printlog(LOG_DEBUG,"SWIM: Processed CHECK packet\n");
+ return;
+}
+
+/** Receiving PING packet*/
+static void swim_receive_ping(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+
+#ifdef ALLOW_PARTITION
+ int id;
+ for(id = 0; id < comm->remote_node_count; id++) {
+ if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
+ if (do_partition && ((partition_set & (1 << id)) == 0)) {
+ printlog(LOG_DEBUG, "SWIM: Ignoring PING message from %d\n", id);
+ return;
+ }
+ }
+ }
+#endif
+
+ printlog(LOG_DEBUG,"SWIM: receiving the PING message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+
+//FIX
+ if(remote->reachability != REACHABLE)
+ return;
+
+ int result;
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ message_t pingack_msg;
+ memset(&pingack_msg, 0, sizeof(message_t));
+ pingack_msg.magic = MAGIC_MSG;
+ pingack_msg.ident_id = msg->ident_id;
+ pingack_msg.value = 0;
+ pingack_msg.weight = 0;
+ pingack_msg.seqno = 0;
+ pingack_msg.min_seqno = 0;
+ pingack_msg.type = PING_ACK;
+
+ swim_comm->incarnation++;
+ pingack_msg.update_present = TRUE;
+ pingack_msg.reachability = REACHABLE;
+ pingack_msg.incarnation = swim_comm->incarnation;
+ FILE *fp = fopen("/root/incarnation", "w+");
+ fprintf(fp, "%d", swim_comm->incarnation + 1);
+ fflush(fp);
+ fclose(fp);
+ // send PING_ACK
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = remote->addr;
+ toaddr.sin_port = remote->port;
+ message_to_nbo(&pingack_msg);
+
+ if (sendto(limiter.udp_socket, &pingack_msg, sizeof(pingack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ } else {
+ printlog(LOG_DEBUG, "SWIM: sent PING_ACK to %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+ }
+ printlog(LOG_DEBUG,"SWIM: Processed PING packet\n");
+ return;
+}
+
+/** Receiving PING_ACK packet*/
+static void swim_receive_pingack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+ // find the source which requested this ping and inform it with CHECK_ACK, ALIVE
+ // look up in the ping_targets list.
+ printlog(LOG_DEBUG, "SWIM: receiving the PING_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+ flushlog();
+ int result, confirm;
+ int delete_head = 0;
+ ping_target_t* pointer;
+ ping_target_t* prev_pointer;
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+ pointer = swim_comm->ping_targets;
+ prev_pointer = swim_comm->ping_targets;
+
+/*
+ * Removed this because if a PING_ACK arrives then CHECK_ACK would be
+ * sent to all the nodes which requested a check on this node. Hence pointer
+ * could be NULL but we could receive a CHECK_ACK packet
+ * if (pointer == NULL) {
+ printlog(LOG_DEBUG, "SWIM: Received PING_ACK for a PING not sent\n");
+ return;
+ }
+*/
+ while(pointer!=NULL) {
+ if(pointer->target.addr == remote->addr && pointer->target.port == remote->port) {
+ // suspect has been found in the list
+ // now construct the CHECK_ACK message and send it to source
+ message_t checkack_msg;
+ memset(&checkack_msg, 0, sizeof(message_t));
+ checkack_msg.magic = MAGIC_MSG;
+ checkack_msg.ident_id = msg->ident_id;
+ checkack_msg.value = 0;
+ checkack_msg.weight = 0;
+ checkack_msg.seqno = 0;
+ checkack_msg.min_seqno = 0;
+ checkack_msg.type = CHECK_ACK;
+ checkack_msg.checkack_value = ALIVE;
+ // inform the source of the addr and port of suspected node
+ checkack_msg.check_target = remote->addr;
+ checkack_msg.check_port = remote->port;
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ // found source
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = pointer->source.addr;
+ toaddr.sin_port = pointer->source.port;
+ message_to_nbo(&checkack_msg);
+
+ if (sendto(limiter.udp_socket, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: swim_receive_pingack : sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ }
+
+ /** Now delete this suspect from friends list of nodes*/
+ if(prev_pointer == pointer) {
+ swim_comm->ping_targets = pointer->next;
+ pointer->next = NULL;
+ free(pointer);
+ pointer = swim_comm->ping_targets;
+ delete_head = 1;
+ } else {
+ prev_pointer->next = pointer->next;
+ pointer->next = NULL;
+ free(pointer);
+ pointer = prev_pointer;
+ }
+ confirm = 1;
+ }
+ prev_pointer = pointer;
+ if(pointer != NULL && delete_head != 1) {
+ pointer = pointer->next;
+ }
+ delete_head = 0;
+ printf("SWIM: PING ACK\n");
+ }
+ // PING_ACK has been received then add to the list of updates
+ remote_node_t sender;
+ memset(&sender, 0, sizeof(remote_node_t));
+ sender.addr = remote->addr;
+ sender.port = remote->port;
+ update_t* new_update = (update_t *) malloc(sizeof(update_t));
+ memset(new_update, 0, sizeof(update_t));
+ new_update->remote = map_search(comm->remote_node_map, &sender, sizeof(remote_node_t));
+ if(new_update->remote == NULL) {
+ printlog(LOG_DEBUG, "SWIM: PANIC: PING_ACK received from an unknown node %s\n",inet_ntoa(*(struct in_addr *)&sender.addr));
+ }
+ new_update->count = 0;
+
+ if(msg->incarnation > new_update->remote->incarnation) {
+ new_update->remote->incarnation = msg->incarnation;
+ new_update->remote->reachability = REACHABLE;
+ new_update->remote->awol = 0;
+ new_update->remote->count_rounds = 0;
+ new_update->remote->count_awol = 0;
+ new_update->remote->count_alive = 0;
+ if( find_and_update(swim_comm->updates, new_update) == 0 ) {
+ swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+ swim_comm->count_updates++;
+ }
+ } else if(msg->incarnation == new_update->remote->incarnation) {
+ // if the node previously thought that sender was down then it prevails
+ // else if the node thought it was up then there is no change
+ }
+
+ if(confirm != 1) printlog(LOG_DEBUG,"SWIM: PING_ACK did not match entries in list\n");
+ printlog(LOG_DEBUG,"SWIM: Processed PING_ACK packet\n");
+ return;
+}
+
+/** Receiving CHECK_ACK packet*/
+static void swim_receive_checkack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+ printlog(LOG_DEBUG, "SWIM: receiving the CHECK_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+ int i;
+ for( i = 0; i < comm->remote_node_count; i++) {
+ if(comm->remote_limiters[i].addr == msg->check_target && comm->remote_limiters[i].port == msg->check_port) {
+ if(msg->checkack_value == ALIVE)
+ comm->remote_limiters[i].count_alive++;
+ else if (msg->checkack_value == AWOL)
+ comm->remote_limiters[i].count_awol++;
+ }
+ }
+ printlog(LOG_DEBUG,"SWIM: Processed CHECK_ACK packet\n");
+ return;
+}
+
+static int swim_send(comm_t *comm, int id, int sock) {
+ int i, result;
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ /**SOURCE: Send messages to friends to check on
+ * nodes which are suspected to be down*/
+ for(i = 0; i < comm->remote_node_count; i++) {
+ printlog(LOG_DEBUG, "SWIM: AWOL count of %s is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].awol);
+ if(comm->remote_limiters[i].awol == GOSSIP_REMOTE_AWOL_THRESHOLD) {
+ // HACK to make sure this part of code is entered only once
+ comm->remote_limiters[i].reachability = SUSPECT;
+ comm->remote_limiters[i].awol++;
+ help_from_friends(comm, i, id, sock);
+ }
+ }
+
+ /**SOURCE: Count number of rounds since the node has been suspected
+ * If in this process the count reaches threshold then take action
+ */
+ for (i = 0; i < comm->remote_node_count; i++) {
+ if(comm->remote_limiters[i].reachability == SUSPECT) {
+ comm->remote_limiters[i].count_rounds++;
+ printlog(LOG_DEBUG, "SWIM: ROUNDS count on %s index %d is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), i, comm->remote_limiters[i].count_rounds);
+ if(comm->remote_limiters[i].count_rounds > SOURCE_THRESHOLD) {
+ if(comm->remote_limiters[i].count_alive > 0) {
+ printlog(LOG_DEBUG,"SWIM: the node %s was up, wrongly suspected\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
+ comm->remote_limiters[i].reachability = REACHABLE;
+ comm->remote_limiters[i].count_rounds = 0;
+ comm->remote_limiters[i].count_awol = 0;
+ comm->remote_limiters[i].count_alive = 0;
+ // FIX
+ comm->remote_limiters[i].awol = 0;
+ }
+ else if (comm->remote_limiters[i].count_awol > 0) {
+ comm->remote_limiters[i].reachability = UNREACHABLE;
+ update_t* new_update = (update_t *) malloc(sizeof(update_t));
+ memset(new_update, 0, sizeof(update_t));
+ new_update->remote = &comm->remote_limiters[i];
+ new_update->count = 0;
+ // comm->remote_limiters[i].incoming.seen_seqno = 0;
+ printlog(LOG_DEBUG, "SWIM: INFECT: Update down information %s\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
+ if(find_and_update(swim_comm->updates, new_update) == 0) {
+ swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+ swim_comm->count_updates++;
+ }
+ printlog(LOG_DEBUG,"SWIM: The node %s is down. reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);// The node is check_list.target
+ }
+ else {
+ /**Even friends have not responded, request for help from more friends?*/
+ printlog(LOG_DEBUG,"SWIM: Last ditch effort, even friends did not respond\n");
+ comm->remote_limiters[i].reachability = SUSPECT;
+ comm->remote_limiters[i].count_rounds = 0; // CHECK
+ help_from_friends(comm, i, id, sock);
+ }
+ }
+ }
+ }
+
+ /**Actions performed by "FRIEND"*/
+ // DELETE THIS LOOP
+ ping_target_t* ping_list = swim_comm->ping_targets;
+ while(ping_list != NULL) {
+ printlog(LOG_DEBUG, "SWIM: in PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
+ ping_list = ping_list->next;
+ }
+ ping_list = swim_comm->ping_targets;
+ ping_target_t* ping_list_prev = swim_comm->ping_targets;
+ int delete_head = 0;
+ while(ping_list != NULL) {
+ ping_list->count++;
+ printlog(LOG_DEBUG,"SWIM: friend keeping track of gossip rounds since PING.\n");
+ // if in this process some node hits threshold then
+ // send AWOL and delete it from this list
+ if(ping_list->count >= FRIEND_THRESHOLD) {
+ printlog(LOG_DEBUG,"SWIM: friend declaring AWOL.\n");
+ message_t checkack_msg;
+ memset(&checkack_msg, 0, sizeof(message_t));
+ checkack_msg.magic = MAGIC_MSG;
+ checkack_msg.ident_id = id;
+ checkack_msg.value = 0;
+ checkack_msg.weight = 0;
+ checkack_msg.seqno = 0;
+ checkack_msg.min_seqno = 0;
+ checkack_msg.type = CHECK_ACK;
+ checkack_msg.checkack_value = AWOL;
+ // inform the source of the addr and port of suspected node
+ checkack_msg.check_target = ping_list->target.addr;
+ checkack_msg.check_port = ping_list->target.port;
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = ping_list->source.addr;
+ toaddr.sin_port = ping_list->source.port;
+ message_to_nbo(&checkack_msg);
+
+ if (sendto(sock, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ }
+ // now the deletion: after deletion we want to continue from next node
+ printlog(LOG_DEBUG, "SWIM: deleting from PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
+ if(ping_list_prev == ping_list) {
+ swim_comm->ping_targets = ping_list->next;
+ ping_list->next = NULL;
+ free(ping_list);
+ ping_list = swim_comm->ping_targets;
+ delete_head = 1;
+ } else {
+ ping_list_prev->next = ping_list->next;
+ ping_list->next = NULL;
+ free(ping_list);
+ ping_list = ping_list_prev;
+ }
+ }
+ ping_list_prev = ping_list;
+ if (ping_list != NULL && delete_head != 1) ping_list = ping_list->next;
+ delete_head = 0;
+ }
+
+ return 0;
+}
+
+
+/** Handle SWIM packets received*/
+int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ if (msg->type == ACK) {
+ /* If ACK was received then reset the awol count */
+ if (msg->seqno == remote->outgoing.next_seqno - 1) {
+ /* Ack for most recent message. Clear saved state. */
+ remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+ remote->outgoing.saved_value = 0;
+ remote->outgoing.saved_weight = 0;
+
+ remote->awol = 0;
+ remote->count_awol = 0;
+ }
+ /* Ignore ack if it isn't for most recent message. */
+ } else if (msg->type == MSG) {
+ if (msg->min_seqno > remote->incoming.seen_seqno) {
+ /* Entirely new information */
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+ comm->gossip.value += msg->value;
+ comm->gossip.weight += msg->weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ remote->count_rounds = 0;
+ remote->count_awol = 0;
+ remote->count_alive = 0;
+
+ // check if there is an update piggy backed on this message
+ // if yes then add it to the update list
+ if(msg->update_present > 0) {
+ update_t *new_update = (update_t *) malloc(sizeof(update_t));
+ memset(new_update, 0, sizeof(update_t));
+ // look for the remote limiter about whom update is being
+ // sent the update node is sent in the message, msg->node!
+ remote_limiter_t *temp_remote = map_search(comm->remote_node_map, &msg->node, sizeof(remote_node_t));
+ // an update about the node itself is possible in which case map_search would fail
+ if(temp_remote != NULL) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update says %d\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr), msg->reachability);
+ new_update->remote = temp_remote;
+ new_update->count = 0;
+ if(msg->incarnation > new_update->remote->incarnation) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update is about new incarnation\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+ new_update->remote->reachability = msg->reachability;
+ new_update->remote->incarnation = msg->incarnation;
+ if(find_and_update(swim_comm->updates, new_update) == 0) {
+ swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+ swim_comm->count_updates++;
+ }
+ } else if(msg->incarnation == new_update->remote->incarnation && new_update->remote->reachability == REACHABLE && msg->reachability == UNREACHABLE) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update about same incarnation, says node unreachable\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+ new_update->remote->reachability = msg->reachability;
+ if(find_and_update(swim_comm->updates, new_update) == 0) {
+ swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+ swim_comm->count_updates++;
+ } else {
+ // Ignore the update
+ printlog(LOG_DEBUG, "SWIM: INFECT: update about %s ignored\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+ }
+ }
+ }
+ }
+ }
+ else if (msg->seqno > remote->incoming.seen_seqno) {
+ /* Only some of the message is old news. */
+ double diff_value = msg->value - remote->incoming.saved_value;
+ double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+
+ comm->gossip.value += diff_value;
+ comm->gossip.weight += diff_weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ remote->count_awol = 0;
+ }
+ else {
+ /* The entire message is old news. (Duplicate). */
+ /* Do nothing. */
+ }
+ // Hearing from a node previously declared unreachable
+ if(remote->reachability == UNREACHABLE) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: seems like %s is back up\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+ remote->reachability = SUSPECT;
+ remote->awol = GOSSIP_REMOTE_AWOL_THRESHOLD;
+ remote->count_rounds = 0;
+ remote->count_awol = 0;
+ remote->count_alive = 0;
+ }
+ }
+ else if(msg->type == CHECK) {
+ swim_receive_check(comm, sock, remote, msg);
+ }
+ else if(msg->type == PING ) {
+ swim_receive_ping(comm, sock, remote, msg);
+ }
+ else if(msg->type == PING_ACK) {
+ swim_receive_pingack(comm, sock, remote, msg);
+ }
+ else if(msg->type == CHECK_ACK) {
+ swim_receive_checkack(comm, sock, remote, msg);
+ }
+ return 0;
+}
+
+int send_gossip_swim(comm_t *comm, uint32_t id, int sock) {
+ int i, j;
+ int retry_index = 0;
+ int result = 0;
+ remote_limiter_t *remote;
+ struct sockaddr_in toaddr;
+ double msg_value, msg_weight;
+
+ /* This is the factor for the portion of value/weight to keep locally.
+ * Normally this is 1, meaning that we retain the same amount of value/weight
+ * that was sent to the peers. In the case of not being able to send to a
+ * peer though, we increment this to reclaim the value/weight locally. */
+ int message_portion = 1;
+
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+
+ msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
+ msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
+
+ /*Nodes to which message was sent will have a non-zero here*/
+ /* for (i = 0; i < comm->remote_node_count; i++) {
+ if(comm->remote_limiters[i].awol > 0)
+ comm->remote_limiters[i].awol++;
+ }*/
+
+ for (i = 0; i < comm->remote_node_count; i++) {
+ printlog(LOG_DEBUG, "SWIM: STATUS: Node: %s reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);
+ }
+
+ for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+ int targetid = NULL_PEER;
+ int rand_count = 0;
+ message_t msg;
+
+ printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
+
+ /* If there are any peers with unacked messages, select them first. */
+ while (retry_index < comm->remote_node_count) {
+ if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
+ targetid = retry_index;
+ printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
+ retry_index += 1;
+ break;
+ }
+ retry_index += 1;
+ }
+
+ while (targetid == NULL_PEER && rand_count < 50) {
+ /* *Gossip node would be chosen from
+ * the array which would be randomly shuffled
+ * once all the nodes have been sent messages*
+ */
+ targetid = find_gossip_target(comm);
+ /* If we didn't find any peers the needed retransmissions, select one randomly here. */
+/* targetid = myrand() % comm->remote_node_count;
+ rand_count += 1;
+*/
+ /* Don't select an already-used index. */
+ for (j = 0; j < i; ++j) {
+ if (targetid == comm->selected[j]) {
+ printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]);
+ targetid = NULL_PEER;
+ break;
+ }
+ }
+ if (targetid != NULL_PEER) {
+ if(comm->remote_limiters[targetid].reachability != REACHABLE) {
+ printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. reachability is %d, and remote awol count is %d\n",
+ targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].awol);
+ targetid = NULL_PEER;
+ }
+ }
+
+ rand_count++;
+ }
+
+ if (targetid < 0) {
+ /* Couldn't find a suitable peer to send to... */
+ message_portion += 1;
+ printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
+ continue;
+ } else {
+ printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
+ }
+
+ remote = &comm->remote_limiters[targetid];
+ comm->selected[i] = targetid;
+
+ toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
+ toaddr.sin_port = remote->port;
+
+ memset(&msg, 0, sizeof(message_t));
+ msg.magic = MAGIC_MSG;
+ msg.ident_id = id;
+ msg.value = msg_value + remote->outgoing.saved_value;
+ msg.weight = msg_weight + remote->outgoing.saved_weight;
+ msg.seqno = remote->outgoing.next_seqno;
+ msg.min_seqno = remote->outgoing.first_seqno;
+ msg.type = MSG;
+ msg.view = comm->gossip.view;
+ if(comm->gossip.membership == SWIM) {
+ swim_comm_t *swim_comm = (swim_comm_t *)comm->membership_state;
+ // piggy back an update
+ if(swim_comm->count_updates > 0) {
+ int index = myrand() % swim_comm->count_updates;
+ update_t *pointer = swim_comm->updates;
+ update_t *prev_pointer = swim_comm->updates;
+ while(index != 0) {
+ prev_pointer = pointer;
+ pointer = pointer->next;
+ index--;
+ }
+ msg.update_present = TRUE;
+ msg.reachability = pointer->remote->reachability;
+ msg.incarnation = pointer->remote->incarnation;
+ msg.node.addr = pointer->remote->addr;
+ msg.node.port = pointer->remote->port;
+ printlog(LOG_DEBUG, "SWIM: Sending update about %s\n", inet_ntoa(*(struct in_addr *)&pointer->remote->addr));
+ pointer->count++;
+ if(swim_comm->updates != pointer && pointer->count == UPDATE_THRESHOLD) {
+ // pointer is not head
+ prev_pointer->next = pointer->next;
+ pointer->next = NULL;
+ free(pointer);
+ swim_comm->count_updates--;
+ } else if(pointer->count == UPDATE_THRESHOLD) {
+ // pointer is head of the list
+ swim_comm->updates = pointer->next;
+ pointer->next = NULL;
+ free(pointer);
+ swim_comm->count_updates--;
+ if(swim_comm->count_updates == 0) {
+ swim_comm->updates = NULL;
+ }
+ }
+ }
+ }
+ remote->outgoing.next_seqno++;
+ remote->outgoing.saved_value += msg_value;
+ remote->outgoing.saved_weight += msg_weight;
+ /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
+ remote->awol += 1;
+
+
+#ifdef ALLOW_PARTITION
+
+ if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+ printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
+ continue;
+ }
+
+#endif
+
+ message_to_nbo(&msg);
+
+ if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
+ result = errno;
+ break;
+ }
+ printlog(LOG_DEBUG,"SWIM: sent the gossip to %s\n", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr));
+ }
+ swim_send(comm, id, sock);
+ comm->gossip.value = msg_value * message_portion;
+ comm->gossip.weight = msg_weight * message_portion;
+
+ return result;
+}
+
+void swim_restart(comm_t *comm, int32_t view_number) {
+ /* Not sure about this yet... */
+}
+
+int swim_init(comm_t *comm, uint32_t id) {
+ comm->membership_state = malloc(sizeof(swim_comm_t));
+ if (comm->membership_state == NULL) {
+ return ENOMEM;
+ }
+ comm->connected = 1;
+ comm->recv_function = swim_receive;
+ comm->send_function = send_gossip_swim;
+ comm->restart_function = swim_restart;
+
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ FILE *fp = fopen("/root/incarnation", "w+");
+ fscanf(fp, "%d", &swim_comm->incarnation);
+ fprintf(fp, "%d", swim_comm->incarnation + 1); // next time a greater incarnation would be read
+ fflush(fp);
+ fclose(fp);
+
+ return 0;
+}
+
+void swim_teardown(comm_t *comm) {
+ if (comm->membership_state)
+ free(comm->membership_state);
+}
+
+/**Bhanu: new functions being introduced*/
+/** Given a friend_id and the address of the suspected node
+ * this function sends a CHECK message to friend */
+/*
+ int help_from_friend(comm_t* comm, int friend_id, in_addr_t addr, in_port_t port, uint32_t id, int sock) {
+// send a CHECK message to friend
+int result;
+message_t msg;
+memset(&msg, 0, sizeof(message_t));
+msg.magic = MAGIC_MSG;
+msg.ident_id = id;
+msg.value = 0;
+msg.weight = 0;
+msg.seqno = 0;
+msg.min_seqno = 0;
+msg.type = CHECK;
+msg.check_target = addr;
+msg.check_port = port;
+
+// send the message
+struct sockaddr_in toaddr;
+memset(&toaddr, 0, sizeof(sockaddr_in));
+toaddr.sin_family = AF_INET;
+toaddr.sin_addr.s_addr = comm->remote_limiters[friend_id].addr;
+toaddr.sin_port = comm->remote_limiters[friend_id].port;
+message_to_nbo(&msg);
+
+if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+result = errno;
+printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+}
+}
+*/