Updates to autotools for library detection
[distributedratelimiting.git] / drl / swim.c
diff --git a/drl/swim.c b/drl/swim.c
new file mode 100644 (file)
index 0000000..9993ec6
--- /dev/null
@@ -0,0 +1,853 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#include <errno.h>
+
+/* Debug output. */
+#include <stdio.h>
+#include <stdlib.h>
+
+/* Socket functions. */
+#include <sys/types.h>
+#include <sys/socket.h>
+
+/* Byte ordering and address structures. */
+#include <arpa/inet.h>
+
+/* memset() */
+#include <string.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "swim.h"
+#include "logging.h"
+
+/* From ulogd_DRL.c */
+extern int do_partition;
+extern int partition_set;
+
+extern limiter_t limiter;
+
+/**Finds the update, if found then frees the memory of the new_update
+ * and returns 1. If find fails then this returns 0*/
+static int find_and_update(update_t *updates, update_t *new_update) {
+    if( updates == NULL ) {
+        printlog(LOG_DEBUG, "SWIM: INFECT: no existing updates\n");
+        return 0;
+    }
+    update_t *pointer = updates;
+    while(pointer != NULL) {
+        if(pointer->remote == new_update->remote && pointer->remote->incarnation >= new_update->remote->incarnation) {
+            pointer->count = 0;
+            printlog(LOG_DEBUG, "SWIM: INFECT: update already exists\n");
+            free(new_update);
+            return 1;
+        }
+        pointer = pointer->next;
+    }
+    printlog(LOG_DEBUG, "SWIM: INFECT: update not found among existing updates\n");
+    return 0;
+}
+
+/*Just adds to the end of list and returns the head*/
+update_t *add_to_list(update_t *updates, update_t *new_update) {
+    printlog(LOG_DEBUG, "SWIM: INFECT: adding to list of updates: %s is %d\n", inet_ntoa(*(struct in_addr *)&new_update->remote->addr), new_update->remote->reachability);
+    update_t *head = updates;
+    update_t *pointer;
+    if (head == NULL) {
+        head = new_update;
+    } else {
+        pointer = head;
+        while(pointer->next != NULL) {
+            pointer = pointer->next;
+        }
+        pointer->next = new_update;
+    }
+    return head;
+}
+
+/** Given the address of the suspected node this function
+ * identifies friends who can probe the suspected node
+ * After recording these, the messages for help are sent 
+ * in the next gossip round*/
+static int help_from_friends(comm_t *comm, int suspect_index, uint32_t id, int sock) {
+    printlog(LOG_DEBUG,"SWIM: In function help_from_friends suspected node: %s, index: %d\n",inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
+    int i=0, j = 0;
+    int result;
+    int count_friends = (comm->remote_node_count > MAX_FRIENDS ) ? MAX_FRIENDS : comm->remote_node_count; // A more logical way?
+    // remote_node_t friend_nodes[count_friends];
+    // int friend_ids[count_friends];
+
+    while( (i - j) < count_friends && i < comm->remote_node_count ) {
+        // Do not pick a friend who is suspected to be down
+        if (comm->remote_limiters[i].reachability != REACHABLE) {
+            j++; i++;
+            continue;
+        }
+        /**construct the message and send it to friend
+         * pick up friend i*/
+        message_t check_msg;
+        memset(&check_msg, 0, sizeof(message_t));
+        check_msg.magic = MAGIC_MSG;
+        check_msg.ident_id = id;
+        check_msg.value = 0;
+        check_msg.weight = 0;
+        check_msg.seqno = 0;
+        check_msg.min_seqno = 0;
+        check_msg.type = CHECK;
+        check_msg.check_target = comm->remote_limiters[suspect_index].addr;
+        check_msg.check_port = comm->remote_limiters[suspect_index].port;
+
+        // send the message
+        struct sockaddr_in toaddr;
+        memset(&toaddr, 0, sizeof(struct sockaddr_in));
+        toaddr.sin_family = AF_INET;
+        toaddr.sin_addr.s_addr = comm->remote_limiters[i].addr;
+        toaddr.sin_port = comm->remote_limiters[i].port;
+        message_to_nbo(&check_msg);
+
+        printlog(LOG_DEBUG,"SWIM: Sending CHECK message to friend %s i: %d", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr), i);
+        printlog(LOG_DEBUG," Suspect: %s", inet_ntoa(*(struct in_addr *)&check_msg.check_target));
+        printlog(LOG_DEBUG," Initial: %s suspect_index: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
+        if (sendto(sock, &check_msg, sizeof(check_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+            printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+            result = errno;
+            printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+        }
+        i++;
+    }
+    printlog(LOG_DEBUG,"SWIM: Out function help_from_friends.\n");
+    return 0;
+}
+
+/** Receiving CHECK packet*/
+static void swim_receive_check(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+
+#ifdef ALLOW_PARTITION
+    int id;
+    for(id = 0; id < comm->remote_node_count; id++) {
+        if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
+            if (do_partition && ((partition_set & (1 << id)) == 0)) {
+                printlog(LOG_DEBUG, "SWIM: Ignoring CHECK message from %d\n", id);
+                return;
+            }
+        }
+    }
+#endif
+
+//FIX
+    if(remote->reachability != REACHABLE)
+        return;
+
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    // create the message that has to be sent to the suspected node
+    printlog(LOG_DEBUG,"SWIM: received CHECK message from %s", inet_ntoa(*(struct in_addr *)&remote->addr));
+    printlog(LOG_DEBUG,", sending PING to %s\n", inet_ntoa(*(struct in_addr *)&msg->check_target));
+    int result;
+    message_t ping_msg;
+    memset(&ping_msg, 0, sizeof(message_t));
+    ping_msg.magic = MAGIC_MSG;
+    ping_msg.ident_id = msg->ident_id;
+    ping_msg.value = 0;
+    ping_msg.weight = 0;
+    ping_msg.seqno = 0;
+    ping_msg.min_seqno = 0;
+    ping_msg.type = PING;
+    ping_msg.ping_source = remote->addr;
+    ping_msg.ping_port = remote->port;
+    // send the ping message
+    struct sockaddr_in toaddr;
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+    toaddr.sin_addr.s_addr = msg->check_target;
+    toaddr.sin_port = msg->check_port;
+    message_to_nbo(&ping_msg);
+
+    /** add to ping targets before sending message */
+    ping_target_t *suspect = (ping_target_t*) malloc(sizeof(ping_target_t));
+    memset(suspect, 0, sizeof(ping_target_t));
+    suspect->target.addr = msg->check_target;
+    suspect->target.port = msg->check_port;
+    suspect->source.addr = remote->addr;
+    suspect->source.port = remote->port;
+    printlog(LOG_DEBUG, "SWIM: adding %s to PING list\n", inet_ntoa(*(struct in_addr *)&suspect->target.addr));
+   
+    ping_target_t *pointer = swim_comm->ping_targets;
+    if( swim_comm->ping_targets != NULL ) {
+        while(pointer->next != NULL) {
+            pointer = pointer->next;
+        }
+        pointer->next = suspect;
+    }
+    else {
+        swim_comm->ping_targets = suspect;
+    }
+    /** added to the end of the list of ping_targets */
+   
+    if (sendto(limiter.udp_socket, &ping_msg, sizeof(ping_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+        printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+        result = errno;
+        printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+    } else {
+        printlog(LOG_DEBUG,"SWIM: Sent PING message\n");
+    }
+    printlog(LOG_DEBUG,"SWIM: Processed CHECK packet\n");
+    return;
+}
+
+/** Receiving PING packet*/
+static void swim_receive_ping(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+
+#ifdef ALLOW_PARTITION
+    int id;
+    for(id = 0; id < comm->remote_node_count; id++) {
+        if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
+            if (do_partition && ((partition_set & (1 << id)) == 0)) {
+                printlog(LOG_DEBUG, "SWIM: Ignoring PING message from %d\n", id);
+                return;
+            }
+        }
+    }
+#endif
+    printlog(LOG_DEBUG,"SWIM: receiving the PING message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+
+//FIX
+    if(remote->reachability != REACHABLE)
+        return;
+    
+    int result;
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    message_t pingack_msg;
+    memset(&pingack_msg, 0, sizeof(message_t));
+    pingack_msg.magic = MAGIC_MSG;
+    pingack_msg.ident_id = msg->ident_id;
+    pingack_msg.value = 0;
+    pingack_msg.weight = 0;
+    pingack_msg.seqno = 0;
+    pingack_msg.min_seqno = 0;
+    pingack_msg.type = PING_ACK;
+    
+    swim_comm->incarnation++;
+    pingack_msg.update_present = TRUE;
+    pingack_msg.reachability = REACHABLE;
+    pingack_msg.incarnation = swim_comm->incarnation;
+    FILE *fp = fopen("/root/incarnation", "w+");
+    fprintf(fp, "%d", swim_comm->incarnation + 1);
+    fflush(fp);
+    fclose(fp);
+    // send PING_ACK
+    struct sockaddr_in toaddr;
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+    toaddr.sin_addr.s_addr = remote->addr;
+    toaddr.sin_port = remote->port;
+    message_to_nbo(&pingack_msg);
+
+    if (sendto(limiter.udp_socket, &pingack_msg, sizeof(pingack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+        printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+        result = errno;
+        printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+    } else {
+        printlog(LOG_DEBUG, "SWIM: sent PING_ACK to %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+    }
+    printlog(LOG_DEBUG,"SWIM: Processed PING packet\n");
+    return;
+}
+
+/** Receiving PING_ACK packet*/
+static void swim_receive_pingack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+    // find the source which requested this ping and inform it with CHECK_ACK, ALIVE
+    // look up in the ping_targets list.
+    printlog(LOG_DEBUG, "SWIM: receiving the PING_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+    flushlog();
+    int result, confirm;
+    int delete_head = 0;
+    ping_target_t* pointer;
+    ping_target_t* prev_pointer;
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+    pointer = swim_comm->ping_targets;
+    prev_pointer = swim_comm->ping_targets;
+
+/*  
+ *  Removed this because if a PING_ACK arrives then CHECK_ACK would be
+ *  sent to all the nodes which requested a check on this node. Hence pointer
+ *  could be NULL but we could receive a CHECK_ACK packet
+ *  if (pointer == NULL) {
+        printlog(LOG_DEBUG, "SWIM: Received PING_ACK for a PING not sent\n");
+        return;
+    }
+*/    
+    while(pointer!=NULL) {
+        if(pointer->target.addr == remote->addr && pointer->target.port == remote->port) {
+            // suspect has been found in the list
+            // now construct the CHECK_ACK message and send it to source
+            message_t checkack_msg;
+            memset(&checkack_msg, 0, sizeof(message_t));
+            checkack_msg.magic = MAGIC_MSG;
+            checkack_msg.ident_id = msg->ident_id;
+            checkack_msg.value = 0;
+            checkack_msg.weight = 0;
+            checkack_msg.seqno = 0;
+            checkack_msg.min_seqno = 0;
+            checkack_msg.type = CHECK_ACK;
+            checkack_msg.checkack_value = ALIVE;
+            // inform the source of the addr and port of suspected node
+            checkack_msg.check_target = remote->addr;
+            checkack_msg.check_port = remote->port;
+            struct sockaddr_in toaddr;
+            memset(&toaddr, 0, sizeof(struct sockaddr_in));
+            // found source
+            toaddr.sin_family = AF_INET;
+            toaddr.sin_addr.s_addr = pointer->source.addr;
+            toaddr.sin_port = pointer->source.port;
+            message_to_nbo(&checkack_msg);
+
+            if (sendto(limiter.udp_socket, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+                printlog(LOG_WARN, "WARN: swim_receive_pingack : sento failed.\n");
+                result = errno;
+                printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+            }
+
+            /** Now delete this suspect from friends list of nodes*/
+            if(prev_pointer == pointer) {
+                swim_comm->ping_targets = pointer->next;
+                pointer->next = NULL;
+                free(pointer);
+                pointer = swim_comm->ping_targets;
+                delete_head = 1;
+            } else {
+                prev_pointer->next = pointer->next;
+                pointer->next = NULL;
+                free(pointer);
+                pointer = prev_pointer;
+            }
+            confirm = 1;
+        }
+        prev_pointer = pointer;
+        if(pointer != NULL && delete_head != 1) {
+            pointer = pointer->next;
+        }
+        delete_head = 0;
+        printf("SWIM: PING ACK\n");
+    }
+    // PING_ACK has been received then add to the list of updates
+    remote_node_t sender;
+    memset(&sender, 0, sizeof(remote_node_t));
+    sender.addr = remote->addr;
+    sender.port = remote->port;
+    update_t* new_update = (update_t *) malloc(sizeof(update_t));
+    memset(new_update, 0, sizeof(update_t));
+    new_update->remote = map_search(comm->remote_node_map, &sender, sizeof(remote_node_t));
+    if(new_update->remote == NULL) {
+        printlog(LOG_DEBUG, "SWIM: PANIC: PING_ACK received from an unknown node %s\n",inet_ntoa(*(struct in_addr *)&sender.addr));
+    }
+    new_update->count = 0;
+
+    if(msg->incarnation > new_update->remote->incarnation) {
+        new_update->remote->incarnation = msg->incarnation;
+        new_update->remote->reachability = REACHABLE;
+        new_update->remote->awol = 0;
+        new_update->remote->count_rounds = 0;
+        new_update->remote->count_awol = 0;
+        new_update->remote->count_alive = 0;
+        if( find_and_update(swim_comm->updates, new_update) == 0 ) {
+            swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+            swim_comm->count_updates++;
+        }
+    } else if(msg->incarnation == new_update->remote->incarnation) {
+        // if the node previously thought that sender was down then it prevails
+        // else if the node thought it was up then there is no change
+    }
+
+    if(confirm != 1) printlog(LOG_DEBUG,"SWIM: PING_ACK did not match entries in list\n");
+    printlog(LOG_DEBUG,"SWIM: Processed PING_ACK packet\n");
+    return;
+}
+
+/** Receiving CHECK_ACK packet*/
+static void swim_receive_checkack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+    printlog(LOG_DEBUG, "SWIM: receiving the CHECK_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+    int i;
+    for( i = 0; i < comm->remote_node_count; i++) {
+        if(comm->remote_limiters[i].addr == msg->check_target && comm->remote_limiters[i].port == msg->check_port) {
+            if(msg->checkack_value == ALIVE)
+                comm->remote_limiters[i].count_alive++;
+            else if (msg->checkack_value == AWOL)
+                comm->remote_limiters[i].count_awol++;
+        }
+    }
+    printlog(LOG_DEBUG,"SWIM: Processed CHECK_ACK packet\n");
+    return;
+}
+
+static int swim_send(comm_t *comm, int id, int sock) {
+    int i, result;
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    /**SOURCE: Send messages to friends to check on 
+     * nodes which are suspected to be down*/
+    for(i = 0; i < comm->remote_node_count; i++) {
+        printlog(LOG_DEBUG, "SWIM: AWOL count of %s is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].awol);
+        if(comm->remote_limiters[i].awol == GOSSIP_REMOTE_AWOL_THRESHOLD) {
+            // HACK to make sure this part of code is entered only once
+            comm->remote_limiters[i].reachability = SUSPECT;
+            comm->remote_limiters[i].awol++;
+            help_from_friends(comm, i, id, sock);
+        }
+    }
+
+    /**SOURCE: Count number of rounds since the node has been suspected
+     * If in this process the count reaches threshold then take action
+     */
+    for (i = 0; i < comm->remote_node_count; i++) {
+        if(comm->remote_limiters[i].reachability == SUSPECT) {
+            comm->remote_limiters[i].count_rounds++;
+            printlog(LOG_DEBUG, "SWIM: ROUNDS count on %s index %d is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), i, comm->remote_limiters[i].count_rounds);
+            if(comm->remote_limiters[i].count_rounds > SOURCE_THRESHOLD) {
+                if(comm->remote_limiters[i].count_alive > 0) {
+                    printlog(LOG_DEBUG,"SWIM: the node %s was up, wrongly suspected\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
+                    comm->remote_limiters[i].reachability = REACHABLE;
+                    comm->remote_limiters[i].count_rounds = 0;
+                    comm->remote_limiters[i].count_awol = 0;
+                    comm->remote_limiters[i].count_alive = 0;
+                    // FIX
+                    comm->remote_limiters[i].awol = 0;
+                }
+                else if (comm->remote_limiters[i].count_awol > 0) {
+                    comm->remote_limiters[i].reachability = UNREACHABLE;
+                    update_t* new_update = (update_t *) malloc(sizeof(update_t));
+                    memset(new_update, 0, sizeof(update_t));
+                    new_update->remote = &comm->remote_limiters[i];
+                    new_update->count = 0;
+                    // comm->remote_limiters[i].incoming.seen_seqno = 0;
+                    printlog(LOG_DEBUG, "SWIM: INFECT: Update down information %s\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
+                    if(find_and_update(swim_comm->updates, new_update) == 0) {
+                        swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+                        swim_comm->count_updates++;
+                    }
+                    printlog(LOG_DEBUG,"SWIM: The node %s is down. reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);// The node is check_list.target
+                }
+                else {
+                    /**Even friends have not responded, request for help from more friends?*/
+                    printlog(LOG_DEBUG,"SWIM: Last ditch effort, even friends did not respond\n");
+                    comm->remote_limiters[i].reachability = SUSPECT;
+                    comm->remote_limiters[i].count_rounds = 0; // CHECK
+                    help_from_friends(comm, i, id, sock);
+                }
+            }
+        }
+    }
+
+    /**Actions performed by "FRIEND"*/
+    // DELETE THIS LOOP
+    ping_target_t* ping_list = swim_comm->ping_targets;
+    while(ping_list != NULL) {
+        printlog(LOG_DEBUG, "SWIM: in PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
+        ping_list = ping_list->next;
+    }
+    ping_list = swim_comm->ping_targets;
+    ping_target_t* ping_list_prev = swim_comm->ping_targets;
+    int delete_head = 0;
+    while(ping_list != NULL) {
+        ping_list->count++;
+        printlog(LOG_DEBUG,"SWIM: friend keeping track of gossip rounds since PING.\n");
+        // if in this process some node hits threshold then
+        // send AWOL and delete it from this list
+        if(ping_list->count >= FRIEND_THRESHOLD) {
+            printlog(LOG_DEBUG,"SWIM: friend declaring AWOL.\n");
+            message_t checkack_msg;
+            memset(&checkack_msg, 0, sizeof(message_t));
+            checkack_msg.magic = MAGIC_MSG;
+            checkack_msg.ident_id = id;
+            checkack_msg.value = 0;
+            checkack_msg.weight = 0;
+            checkack_msg.seqno = 0;
+            checkack_msg.min_seqno = 0;
+            checkack_msg.type = CHECK_ACK;
+            checkack_msg.checkack_value = AWOL;
+            // inform the source of the addr and port of suspected node
+            checkack_msg.check_target = ping_list->target.addr;
+            checkack_msg.check_port = ping_list->target.port;
+            struct sockaddr_in toaddr;
+            memset(&toaddr, 0, sizeof(struct sockaddr_in));
+            toaddr.sin_family = AF_INET;
+            toaddr.sin_addr.s_addr = ping_list->source.addr;
+            toaddr.sin_port = ping_list->source.port;
+            message_to_nbo(&checkack_msg);
+
+            if (sendto(sock, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+                printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+                result = errno;
+                printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+            }
+            // now the deletion: after deletion we want to continue from next node
+            printlog(LOG_DEBUG, "SWIM: deleting from PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
+            if(ping_list_prev == ping_list) {
+                swim_comm->ping_targets = ping_list->next;
+                ping_list->next = NULL;
+                free(ping_list);
+                ping_list = swim_comm->ping_targets;
+                delete_head = 1;
+            } else {
+                ping_list_prev->next = ping_list->next;
+                ping_list->next = NULL;
+                free(ping_list);
+                ping_list = ping_list_prev;
+            }
+        }
+        ping_list_prev = ping_list;
+        if (ping_list != NULL && delete_head != 1) ping_list = ping_list->next;
+        delete_head = 0;
+    }
+
+    return 0;
+}
+
+
+/** Handle SWIM packets received*/
+int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    if (msg->type == ACK) {
+        /* If ACK was received then reset the awol count */
+        if (msg->seqno == remote->outgoing.next_seqno - 1) {
+            /* Ack for most recent message.  Clear saved state. */
+            remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+            remote->outgoing.saved_value = 0;
+            remote->outgoing.saved_weight = 0;
+
+            remote->awol = 0;
+            remote->count_awol = 0;
+        }
+        /* Ignore ack if it isn't for most recent message. */
+    } else if (msg->type == MSG) {
+        if (msg->min_seqno > remote->incoming.seen_seqno) {
+            /* Entirely new information */
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+            comm->gossip.value += msg->value;
+            comm->gossip.weight += msg->weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+            remote->count_rounds = 0;
+            remote->count_awol = 0;
+            remote->count_alive = 0;
+
+            // check if there is an update piggy backed on this message
+            // if yes then add it to the update list
+            if(msg->update_present > 0) {
+                update_t *new_update = (update_t *) malloc(sizeof(update_t));
+                memset(new_update, 0, sizeof(update_t));
+                // look for the remote limiter about whom update is being
+                // sent the update node is sent in the message, msg->node!
+                remote_limiter_t *temp_remote = map_search(comm->remote_node_map, &msg->node, sizeof(remote_node_t));
+                // an update about the node itself is possible in which case map_search would fail
+                if(temp_remote != NULL) {
+                    printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update says %d\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr), msg->reachability);
+                    new_update->remote = temp_remote;
+                    new_update->count = 0;
+                    if(msg->incarnation > new_update->remote->incarnation) {
+                        printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update is about new incarnation\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+                        new_update->remote->reachability = msg->reachability;
+                        new_update->remote->incarnation = msg->incarnation;
+                        if(find_and_update(swim_comm->updates, new_update) == 0) {
+                            swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+                            swim_comm->count_updates++;
+                        }
+                    } else if(msg->incarnation == new_update->remote->incarnation && new_update->remote->reachability == REACHABLE && msg->reachability == UNREACHABLE) {
+                        printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update about same incarnation, says node unreachable\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+                        new_update->remote->reachability = msg->reachability;
+                        if(find_and_update(swim_comm->updates, new_update) == 0) {
+                            swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+                            swim_comm->count_updates++;
+                        } else {
+                            // Ignore the update
+                            printlog(LOG_DEBUG, "SWIM: INFECT: update about %s ignored\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+                        }
+                    }
+                }
+            }
+        } 
+        else if (msg->seqno > remote->incoming.seen_seqno) {
+            /* Only some of the message is old news. */
+            double diff_value = msg->value - remote->incoming.saved_value;
+            double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+
+            comm->gossip.value += diff_value;
+            comm->gossip.weight += diff_weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+            remote->count_awol = 0;
+        } 
+        else {
+            /* The entire message is old news. (Duplicate). */
+            /* Do nothing. */
+        }
+        // Hearing from a node previously declared unreachable
+        if(remote->reachability == UNREACHABLE) {
+            printlog(LOG_DEBUG, "SWIM: INFECT: seems like %s is back up\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+            remote->reachability = SUSPECT;
+            remote->awol = GOSSIP_REMOTE_AWOL_THRESHOLD;
+            remote->count_rounds = 0;
+            remote->count_awol = 0;
+            remote->count_alive = 0;
+        }
+    }
+    else if(msg->type == CHECK) {
+        swim_receive_check(comm, sock, remote, msg);
+    }
+    else if(msg->type == PING ) {
+        swim_receive_ping(comm, sock, remote, msg);
+    }
+    else if(msg->type == PING_ACK) {
+        swim_receive_pingack(comm, sock, remote, msg);
+    }
+    else if(msg->type == CHECK_ACK) {
+        swim_receive_checkack(comm, sock, remote, msg);
+    }
+    return 0;
+}
+
+int send_gossip_swim(comm_t *comm, uint32_t id, int sock) {
+    int i, j;
+    int retry_index = 0;
+    int result = 0;
+    remote_limiter_t *remote;
+    struct sockaddr_in toaddr;
+    double msg_value, msg_weight;
+
+    /* This is the factor for the portion of value/weight to keep locally.
+     * Normally this is 1, meaning that we retain the same amount of value/weight
+     * that was sent to the peers.  In the case of not being able to send to a
+     * peer though, we increment this to reclaim the value/weight locally. */
+    int message_portion = 1;
+
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+
+    msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
+    msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
+
+    /*Nodes to which message was sent will have a non-zero here*/
+    /*    for (i = 0; i < comm->remote_node_count; i++) {
+          if(comm->remote_limiters[i].awol > 0)
+          comm->remote_limiters[i].awol++;
+          }*/
+
+    for (i = 0; i < comm->remote_node_count; i++) {
+        printlog(LOG_DEBUG, "SWIM: STATUS: Node: %s reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);
+    }
+
+    for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+        int targetid = NULL_PEER;
+        int rand_count = 0;
+        message_t msg;
+
+        printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
+
+        /* If there are any peers with unacked messages, select them first. */
+        while (retry_index < comm->remote_node_count) {
+            if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
+                targetid = retry_index;
+                printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
+                retry_index += 1;
+                break;
+            }
+            retry_index += 1;
+        }
+
+        while (targetid == NULL_PEER && rand_count < 50) {
+            /* *Gossip node would be chosen from
+            * the array which would be randomly shuffled
+            * once all the nodes have been sent messages*
+            */
+            targetid = find_gossip_target(comm);
+            /* If we didn't find any peers the needed retransmissions, select one randomly here. */
+/*                targetid = myrand() % comm->remote_node_count;
+                rand_count += 1;
+*/
+            /* Don't select an already-used index. */
+            for (j = 0; j < i; ++j) {
+                if (targetid == comm->selected[j]) {
+                    printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  selected[j=%d] is %d\n", targetid, j, comm->selected[j]); 
+                    targetid = NULL_PEER;
+                    break;
+                }
+            }
+            if (targetid != NULL_PEER) {
+                if(comm->remote_limiters[targetid].reachability != REACHABLE) {
+                    printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  reachability is %d, and remote awol count is %d\n",
+                            targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].awol);
+                    targetid = NULL_PEER;
+                }
+            }
+            
+            rand_count++;
+        }
+
+        if (targetid < 0) {
+            /* Couldn't find a suitable peer to send to... */
+            message_portion += 1;
+            printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
+            continue;
+        } else {
+            printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
+        }
+
+        remote = &comm->remote_limiters[targetid];
+        comm->selected[i] = targetid;
+
+        toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
+        toaddr.sin_port = remote->port;
+
+        memset(&msg, 0, sizeof(message_t));
+        msg.magic = MAGIC_MSG;
+        msg.ident_id = id;
+        msg.value = msg_value + remote->outgoing.saved_value;
+        msg.weight = msg_weight + remote->outgoing.saved_weight;
+        msg.seqno = remote->outgoing.next_seqno;
+        msg.min_seqno = remote->outgoing.first_seqno;
+        msg.type = MSG;
+        msg.view = comm->gossip.view;
+        if(comm->gossip.membership == SWIM) {
+            swim_comm_t *swim_comm = (swim_comm_t *)comm->membership_state;
+            // piggy back an update
+            if(swim_comm->count_updates > 0) {
+                int index = myrand() % swim_comm->count_updates;
+                update_t *pointer = swim_comm->updates;
+                update_t *prev_pointer = swim_comm->updates;
+                while(index != 0) {
+                    prev_pointer = pointer;
+                    pointer = pointer->next;
+                    index--;
+                }
+                msg.update_present = TRUE;
+                msg.reachability = pointer->remote->reachability;
+                msg.incarnation = pointer->remote->incarnation;
+                msg.node.addr = pointer->remote->addr;
+                msg.node.port = pointer->remote->port;
+                printlog(LOG_DEBUG, "SWIM: Sending update about %s\n", inet_ntoa(*(struct in_addr *)&pointer->remote->addr));
+                pointer->count++;
+                if(swim_comm->updates != pointer && pointer->count == UPDATE_THRESHOLD) {
+                    // pointer is not head
+                    prev_pointer->next = pointer->next;
+                    pointer->next = NULL;
+                    free(pointer);
+                    swim_comm->count_updates--;
+                } else if(pointer->count == UPDATE_THRESHOLD) {
+                    // pointer is head of the list
+                    swim_comm->updates = pointer->next;
+                    pointer->next = NULL;
+                    free(pointer);
+                    swim_comm->count_updates--;
+                    if(swim_comm->count_updates == 0) {
+                        swim_comm->updates = NULL;
+                    }
+                }
+            }
+        }
+        remote->outgoing.next_seqno++;
+        remote->outgoing.saved_value += msg_value;
+        remote->outgoing.saved_weight += msg_weight;
+        /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
+        remote->awol += 1;
+
+
+#ifdef ALLOW_PARTITION
+
+        if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+            printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
+            continue;
+        }
+
+#endif
+
+        message_to_nbo(&msg);
+
+        if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+            printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
+            result = errno;
+            break;
+        }
+        printlog(LOG_DEBUG,"SWIM: sent the gossip to %s\n", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr));
+    }
+    swim_send(comm, id, sock);
+    comm->gossip.value = msg_value * message_portion;
+    comm->gossip.weight = msg_weight * message_portion;
+
+    return result;
+}
+
+void swim_restart(comm_t *comm, int32_t view_number) {
+    /* Not sure about this yet... */
+}
+
+int swim_init(comm_t *comm, uint32_t id) {
+    comm->membership_state = malloc(sizeof(swim_comm_t));
+    if (comm->membership_state == NULL) {
+        return ENOMEM;
+    }
+    comm->connected = 1;
+    comm->recv_function = swim_receive;
+    comm->send_function = send_gossip_swim;
+    comm->restart_function = swim_restart;
+
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    FILE *fp = fopen("/root/incarnation", "w+");
+    fscanf(fp, "%d", &swim_comm->incarnation);
+    fprintf(fp, "%d", swim_comm->incarnation + 1); // next time a greater incarnation would be read
+    fflush(fp);
+    fclose(fp);
+
+    return 0;
+}
+
+void swim_teardown(comm_t *comm) {
+    if (comm->membership_state)
+        free(comm->membership_state);
+}
+
+/**Bhanu: new functions being introduced*/
+/** Given a friend_id and the address of the suspected node
+ * this function sends a CHECK message to friend */
+/*
+   int help_from_friend(comm_t* comm, int friend_id, in_addr_t addr, in_port_t port, uint32_t id, int sock) {
+// send a CHECK message to friend
+int result;
+message_t msg;
+memset(&msg, 0, sizeof(message_t));
+msg.magic = MAGIC_MSG;
+msg.ident_id = id;
+msg.value = 0;
+msg.weight = 0;
+msg.seqno = 0;
+msg.min_seqno = 0;
+msg.type = CHECK;
+msg.check_target = addr;
+msg.check_port = port;
+
+// send the message
+struct sockaddr_in toaddr;
+memset(&toaddr, 0, sizeof(sockaddr_in));
+toaddr.sin_family = AF_INET;
+toaddr.sin_addr.s_addr = comm->remote_limiters[friend_id].addr;
+toaddr.sin_port = comm->remote_limiters[friend_id].port;
+message_to_nbo(&msg);
+
+if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+result = errno;
+printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+}
+}
+*/