Updates to autotools for library detection
[distributedratelimiting.git] / drl / swim.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #include <errno.h>
4
5 /* Debug output. */
6 #include <stdio.h>
7 #include <stdlib.h>
8
9 /* Socket functions. */
10 #include <sys/types.h>
11 #include <sys/socket.h>
12
13 /* Byte ordering and address structures. */
14 #include <arpa/inet.h>
15
16 /* memset() */
17 #include <string.h>
18
19 #include "raterouter.h"
20 #include "ratetypes.h"
21 #include "drl_state.h"
22 #include "peer_comm.h"
23 #include "swim.h"
24 #include "logging.h"
25
26 /* From ulogd_DRL.c */
27 extern int do_partition;
28 extern int partition_set;
29
30 extern limiter_t limiter;
31
32 /**Finds the update, if found then frees the memory of the new_update
33  * and returns 1. If find fails then this returns 0*/
34 static int find_and_update(update_t *updates, update_t *new_update) {
35     if( updates == NULL ) {
36         printlog(LOG_DEBUG, "SWIM: INFECT: no existing updates\n");
37         return 0;
38     }
39     update_t *pointer = updates;
40     while(pointer != NULL) {
41         if(pointer->remote == new_update->remote && pointer->remote->incarnation >= new_update->remote->incarnation) {
42             pointer->count = 0;
43             printlog(LOG_DEBUG, "SWIM: INFECT: update already exists\n");
44             free(new_update);
45             return 1;
46         }
47         pointer = pointer->next;
48     }
49     printlog(LOG_DEBUG, "SWIM: INFECT: update not found among existing updates\n");
50     return 0;
51 }
52
53 /*Just adds to the end of list and returns the head*/
54 update_t *add_to_list(update_t *updates, update_t *new_update) {
55     printlog(LOG_DEBUG, "SWIM: INFECT: adding to list of updates: %s is %d\n", inet_ntoa(*(struct in_addr *)&new_update->remote->addr), new_update->remote->reachability);
56     update_t *head = updates;
57     update_t *pointer;
58     if (head == NULL) {
59         head = new_update;
60     } else {
61         pointer = head;
62         while(pointer->next != NULL) {
63             pointer = pointer->next;
64         }
65         pointer->next = new_update;
66     }
67     return head;
68 }
69
70 /** Given the address of the suspected node this function
71  * identifies friends who can probe the suspected node
72  * After recording these, the messages for help are sent 
73  * in the next gossip round*/
74 static int help_from_friends(comm_t *comm, int suspect_index, uint32_t id, int sock) {
75     printlog(LOG_DEBUG,"SWIM: In function help_from_friends suspected node: %s, index: %d\n",inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
76     int i=0, j = 0;
77     int result;
78     int count_friends = (comm->remote_node_count > MAX_FRIENDS ) ? MAX_FRIENDS : comm->remote_node_count; // A more logical way?
79     // remote_node_t friend_nodes[count_friends];
80     // int friend_ids[count_friends];
81
82     while( (i - j) < count_friends && i < comm->remote_node_count ) {
83         // Do not pick a friend who is suspected to be down
84         if (comm->remote_limiters[i].reachability != REACHABLE) {
85             j++; i++;
86             continue;
87         }
88         /**construct the message and send it to friend
89          * pick up friend i*/
90         message_t check_msg;
91         memset(&check_msg, 0, sizeof(message_t));
92         check_msg.magic = MAGIC_MSG;
93         check_msg.ident_id = id;
94         check_msg.value = 0;
95         check_msg.weight = 0;
96         check_msg.seqno = 0;
97         check_msg.min_seqno = 0;
98         check_msg.type = CHECK;
99         check_msg.check_target = comm->remote_limiters[suspect_index].addr;
100         check_msg.check_port = comm->remote_limiters[suspect_index].port;
101
102         // send the message
103         struct sockaddr_in toaddr;
104         memset(&toaddr, 0, sizeof(struct sockaddr_in));
105         toaddr.sin_family = AF_INET;
106         toaddr.sin_addr.s_addr = comm->remote_limiters[i].addr;
107         toaddr.sin_port = comm->remote_limiters[i].port;
108         message_to_nbo(&check_msg);
109
110         printlog(LOG_DEBUG,"SWIM: Sending CHECK message to friend %s i: %d", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr), i);
111         printlog(LOG_DEBUG," Suspect: %s", inet_ntoa(*(struct in_addr *)&check_msg.check_target));
112         printlog(LOG_DEBUG," Initial: %s suspect_index: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
113         if (sendto(sock, &check_msg, sizeof(check_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
114             printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
115             result = errno;
116             printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
117         }
118         i++;
119     }
120     printlog(LOG_DEBUG,"SWIM: Out function help_from_friends.\n");
121     return 0;
122 }
123
124 /** Receiving CHECK packet*/
125 static void swim_receive_check(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
126
127 #ifdef ALLOW_PARTITION
128     int id;
129     for(id = 0; id < comm->remote_node_count; id++) {
130         if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
131             if (do_partition && ((partition_set & (1 << id)) == 0)) {
132                 printlog(LOG_DEBUG, "SWIM: Ignoring CHECK message from %d\n", id);
133                 return;
134             }
135         }
136     }
137 #endif
138
139 //FIX
140     if(remote->reachability != REACHABLE)
141         return;
142
143     swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
144
145     // create the message that has to be sent to the suspected node
146     printlog(LOG_DEBUG,"SWIM: received CHECK message from %s", inet_ntoa(*(struct in_addr *)&remote->addr));
147     printlog(LOG_DEBUG,", sending PING to %s\n", inet_ntoa(*(struct in_addr *)&msg->check_target));
148     int result;
149     message_t ping_msg;
150     memset(&ping_msg, 0, sizeof(message_t));
151     ping_msg.magic = MAGIC_MSG;
152     ping_msg.ident_id = msg->ident_id;
153     ping_msg.value = 0;
154     ping_msg.weight = 0;
155     ping_msg.seqno = 0;
156     ping_msg.min_seqno = 0;
157     ping_msg.type = PING;
158     ping_msg.ping_source = remote->addr;
159     ping_msg.ping_port = remote->port;
160     // send the ping message
161     struct sockaddr_in toaddr;
162     memset(&toaddr, 0, sizeof(struct sockaddr_in));
163     toaddr.sin_family = AF_INET;
164     toaddr.sin_addr.s_addr = msg->check_target;
165     toaddr.sin_port = msg->check_port;
166     message_to_nbo(&ping_msg);
167
168     /** add to ping targets before sending message */
169     ping_target_t *suspect = (ping_target_t*) malloc(sizeof(ping_target_t));
170     memset(suspect, 0, sizeof(ping_target_t));
171     suspect->target.addr = msg->check_target;
172     suspect->target.port = msg->check_port;
173     suspect->source.addr = remote->addr;
174     suspect->source.port = remote->port;
175     printlog(LOG_DEBUG, "SWIM: adding %s to PING list\n", inet_ntoa(*(struct in_addr *)&suspect->target.addr));
176    
177     ping_target_t *pointer = swim_comm->ping_targets;
178     if( swim_comm->ping_targets != NULL ) {
179         while(pointer->next != NULL) {
180             pointer = pointer->next;
181         }
182         pointer->next = suspect;
183     }
184     else {
185         swim_comm->ping_targets = suspect;
186     }
187     /** added to the end of the list of ping_targets */
188    
189     if (sendto(limiter.udp_socket, &ping_msg, sizeof(ping_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
190         printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
191         result = errno;
192         printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
193     } else {
194         printlog(LOG_DEBUG,"SWIM: Sent PING message\n");
195     }
196     printlog(LOG_DEBUG,"SWIM: Processed CHECK packet\n");
197     return;
198 }
199
200 /** Receiving PING packet*/
201 static void swim_receive_ping(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
202
203 #ifdef ALLOW_PARTITION
204     int id;
205     for(id = 0; id < comm->remote_node_count; id++) {
206         if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
207             if (do_partition && ((partition_set & (1 << id)) == 0)) {
208                 printlog(LOG_DEBUG, "SWIM: Ignoring PING message from %d\n", id);
209                 return;
210             }
211         }
212     }
213 #endif
214  
215     printlog(LOG_DEBUG,"SWIM: receiving the PING message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
216
217 //FIX
218     if(remote->reachability != REACHABLE)
219         return;
220     
221     int result;
222     swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
223
224     message_t pingack_msg;
225     memset(&pingack_msg, 0, sizeof(message_t));
226     pingack_msg.magic = MAGIC_MSG;
227     pingack_msg.ident_id = msg->ident_id;
228     pingack_msg.value = 0;
229     pingack_msg.weight = 0;
230     pingack_msg.seqno = 0;
231     pingack_msg.min_seqno = 0;
232     pingack_msg.type = PING_ACK;
233     
234     swim_comm->incarnation++;
235     pingack_msg.update_present = TRUE;
236     pingack_msg.reachability = REACHABLE;
237     pingack_msg.incarnation = swim_comm->incarnation;
238     FILE *fp = fopen("/root/incarnation", "w+");
239     fprintf(fp, "%d", swim_comm->incarnation + 1);
240     fflush(fp);
241     fclose(fp);
242     // send PING_ACK
243     struct sockaddr_in toaddr;
244     memset(&toaddr, 0, sizeof(struct sockaddr_in));
245     toaddr.sin_family = AF_INET;
246     toaddr.sin_addr.s_addr = remote->addr;
247     toaddr.sin_port = remote->port;
248     message_to_nbo(&pingack_msg);
249
250     if (sendto(limiter.udp_socket, &pingack_msg, sizeof(pingack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
251         printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
252         result = errno;
253         printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
254     } else {
255         printlog(LOG_DEBUG, "SWIM: sent PING_ACK to %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
256     }
257     printlog(LOG_DEBUG,"SWIM: Processed PING packet\n");
258     return;
259 }
260
261 /** Receiving PING_ACK packet*/
262 static void swim_receive_pingack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
263     // find the source which requested this ping and inform it with CHECK_ACK, ALIVE
264     // look up in the ping_targets list.
265     printlog(LOG_DEBUG, "SWIM: receiving the PING_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
266     flushlog();
267     int result, confirm;
268     int delete_head = 0;
269     ping_target_t* pointer;
270     ping_target_t* prev_pointer;
271     swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
272     pointer = swim_comm->ping_targets;
273     prev_pointer = swim_comm->ping_targets;
274
275 /*  
276  *  Removed this because if a PING_ACK arrives then CHECK_ACK would be
277  *  sent to all the nodes which requested a check on this node. Hence pointer
278  *  could be NULL but we could receive a CHECK_ACK packet
279  *  if (pointer == NULL) {
280         printlog(LOG_DEBUG, "SWIM: Received PING_ACK for a PING not sent\n");
281         return;
282     }
283 */    
284     while(pointer!=NULL) {
285         if(pointer->target.addr == remote->addr && pointer->target.port == remote->port) {
286             // suspect has been found in the list
287             // now construct the CHECK_ACK message and send it to source
288             message_t checkack_msg;
289             memset(&checkack_msg, 0, sizeof(message_t));
290             checkack_msg.magic = MAGIC_MSG;
291             checkack_msg.ident_id = msg->ident_id;
292             checkack_msg.value = 0;
293             checkack_msg.weight = 0;
294             checkack_msg.seqno = 0;
295             checkack_msg.min_seqno = 0;
296             checkack_msg.type = CHECK_ACK;
297             checkack_msg.checkack_value = ALIVE;
298             // inform the source of the addr and port of suspected node
299             checkack_msg.check_target = remote->addr;
300             checkack_msg.check_port = remote->port;
301             struct sockaddr_in toaddr;
302             memset(&toaddr, 0, sizeof(struct sockaddr_in));
303             // found source
304             toaddr.sin_family = AF_INET;
305             toaddr.sin_addr.s_addr = pointer->source.addr;
306             toaddr.sin_port = pointer->source.port;
307             message_to_nbo(&checkack_msg);
308
309             if (sendto(limiter.udp_socket, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
310                 printlog(LOG_WARN, "WARN: swim_receive_pingack : sento failed.\n");
311                 result = errno;
312                 printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
313             }
314
315             /** Now delete this suspect from friends list of nodes*/
316             if(prev_pointer == pointer) {
317                 swim_comm->ping_targets = pointer->next;
318                 pointer->next = NULL;
319                 free(pointer);
320                 pointer = swim_comm->ping_targets;
321                 delete_head = 1;
322             } else {
323                 prev_pointer->next = pointer->next;
324                 pointer->next = NULL;
325                 free(pointer);
326                 pointer = prev_pointer;
327             }
328             confirm = 1;
329         }
330         prev_pointer = pointer;
331         if(pointer != NULL && delete_head != 1) {
332             pointer = pointer->next;
333         }
334         delete_head = 0;
335         printf("SWIM: PING ACK\n");
336     }
337     // PING_ACK has been received then add to the list of updates
338     remote_node_t sender;
339     memset(&sender, 0, sizeof(remote_node_t));
340     sender.addr = remote->addr;
341     sender.port = remote->port;
342     update_t* new_update = (update_t *) malloc(sizeof(update_t));
343     memset(new_update, 0, sizeof(update_t));
344     new_update->remote = map_search(comm->remote_node_map, &sender, sizeof(remote_node_t));
345     if(new_update->remote == NULL) {
346         printlog(LOG_DEBUG, "SWIM: PANIC: PING_ACK received from an unknown node %s\n",inet_ntoa(*(struct in_addr *)&sender.addr));
347     }
348     new_update->count = 0;
349
350     if(msg->incarnation > new_update->remote->incarnation) {
351         new_update->remote->incarnation = msg->incarnation;
352         new_update->remote->reachability = REACHABLE;
353         new_update->remote->awol = 0;
354         new_update->remote->count_rounds = 0;
355         new_update->remote->count_awol = 0;
356         new_update->remote->count_alive = 0;
357         if( find_and_update(swim_comm->updates, new_update) == 0 ) {
358             swim_comm->updates = add_to_list(swim_comm->updates, new_update);
359             swim_comm->count_updates++;
360         }
361     } else if(msg->incarnation == new_update->remote->incarnation) {
362         // if the node previously thought that sender was down then it prevails
363         // else if the node thought it was up then there is no change
364     }
365
366     if(confirm != 1) printlog(LOG_DEBUG,"SWIM: PING_ACK did not match entries in list\n");
367     printlog(LOG_DEBUG,"SWIM: Processed PING_ACK packet\n");
368     return;
369 }
370
371 /** Receiving CHECK_ACK packet*/
372 static void swim_receive_checkack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
373     printlog(LOG_DEBUG, "SWIM: receiving the CHECK_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
374     int i;
375     for( i = 0; i < comm->remote_node_count; i++) {
376         if(comm->remote_limiters[i].addr == msg->check_target && comm->remote_limiters[i].port == msg->check_port) {
377             if(msg->checkack_value == ALIVE)
378                 comm->remote_limiters[i].count_alive++;
379             else if (msg->checkack_value == AWOL)
380                 comm->remote_limiters[i].count_awol++;
381         }
382     }
383     printlog(LOG_DEBUG,"SWIM: Processed CHECK_ACK packet\n");
384     return;
385 }
386
387 static int swim_send(comm_t *comm, int id, int sock) {
388     int i, result;
389     swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
390
391     /**SOURCE: Send messages to friends to check on 
392      * nodes which are suspected to be down*/
393     for(i = 0; i < comm->remote_node_count; i++) {
394         printlog(LOG_DEBUG, "SWIM: AWOL count of %s is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].awol);
395         if(comm->remote_limiters[i].awol == GOSSIP_REMOTE_AWOL_THRESHOLD) {
396             // HACK to make sure this part of code is entered only once
397             comm->remote_limiters[i].reachability = SUSPECT;
398             comm->remote_limiters[i].awol++;
399             help_from_friends(comm, i, id, sock);
400         }
401     }
402
403     /**SOURCE: Count number of rounds since the node has been suspected
404      * If in this process the count reaches threshold then take action
405      */
406     for (i = 0; i < comm->remote_node_count; i++) {
407         if(comm->remote_limiters[i].reachability == SUSPECT) {
408             comm->remote_limiters[i].count_rounds++;
409             printlog(LOG_DEBUG, "SWIM: ROUNDS count on %s index %d is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), i, comm->remote_limiters[i].count_rounds);
410             if(comm->remote_limiters[i].count_rounds > SOURCE_THRESHOLD) {
411                 if(comm->remote_limiters[i].count_alive > 0) {
412                     printlog(LOG_DEBUG,"SWIM: the node %s was up, wrongly suspected\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
413                     comm->remote_limiters[i].reachability = REACHABLE;
414                     comm->remote_limiters[i].count_rounds = 0;
415                     comm->remote_limiters[i].count_awol = 0;
416                     comm->remote_limiters[i].count_alive = 0;
417                     // FIX
418                     comm->remote_limiters[i].awol = 0;
419                 }
420                 else if (comm->remote_limiters[i].count_awol > 0) {
421                     comm->remote_limiters[i].reachability = UNREACHABLE;
422                     update_t* new_update = (update_t *) malloc(sizeof(update_t));
423                     memset(new_update, 0, sizeof(update_t));
424                     new_update->remote = &comm->remote_limiters[i];
425                     new_update->count = 0;
426                     // comm->remote_limiters[i].incoming.seen_seqno = 0;
427                     printlog(LOG_DEBUG, "SWIM: INFECT: Update down information %s\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
428                     if(find_and_update(swim_comm->updates, new_update) == 0) {
429                         swim_comm->updates = add_to_list(swim_comm->updates, new_update);
430                         swim_comm->count_updates++;
431                     }
432                     printlog(LOG_DEBUG,"SWIM: The node %s is down. reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);// The node is check_list.target
433                 }
434                 else {
435                     /**Even friends have not responded, request for help from more friends?*/
436                     printlog(LOG_DEBUG,"SWIM: Last ditch effort, even friends did not respond\n");
437                     comm->remote_limiters[i].reachability = SUSPECT;
438                     comm->remote_limiters[i].count_rounds = 0; // CHECK
439                     help_from_friends(comm, i, id, sock);
440                 }
441             }
442         }
443     }
444
445     /**Actions performed by "FRIEND"*/
446     // DELETE THIS LOOP
447     ping_target_t* ping_list = swim_comm->ping_targets;
448     while(ping_list != NULL) {
449         printlog(LOG_DEBUG, "SWIM: in PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
450         ping_list = ping_list->next;
451     }
452     ping_list = swim_comm->ping_targets;
453     ping_target_t* ping_list_prev = swim_comm->ping_targets;
454     int delete_head = 0;
455     while(ping_list != NULL) {
456         ping_list->count++;
457         printlog(LOG_DEBUG,"SWIM: friend keeping track of gossip rounds since PING.\n");
458         // if in this process some node hits threshold then
459         // send AWOL and delete it from this list
460         if(ping_list->count >= FRIEND_THRESHOLD) {
461             printlog(LOG_DEBUG,"SWIM: friend declaring AWOL.\n");
462             message_t checkack_msg;
463             memset(&checkack_msg, 0, sizeof(message_t));
464             checkack_msg.magic = MAGIC_MSG;
465             checkack_msg.ident_id = id;
466             checkack_msg.value = 0;
467             checkack_msg.weight = 0;
468             checkack_msg.seqno = 0;
469             checkack_msg.min_seqno = 0;
470             checkack_msg.type = CHECK_ACK;
471             checkack_msg.checkack_value = AWOL;
472             // inform the source of the addr and port of suspected node
473             checkack_msg.check_target = ping_list->target.addr;
474             checkack_msg.check_port = ping_list->target.port;
475             struct sockaddr_in toaddr;
476             memset(&toaddr, 0, sizeof(struct sockaddr_in));
477             toaddr.sin_family = AF_INET;
478             toaddr.sin_addr.s_addr = ping_list->source.addr;
479             toaddr.sin_port = ping_list->source.port;
480             message_to_nbo(&checkack_msg);
481
482             if (sendto(sock, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
483                 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
484                 result = errno;
485                 printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
486             }
487             // now the deletion: after deletion we want to continue from next node
488             printlog(LOG_DEBUG, "SWIM: deleting from PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
489             if(ping_list_prev == ping_list) {
490                 swim_comm->ping_targets = ping_list->next;
491                 ping_list->next = NULL;
492                 free(ping_list);
493                 ping_list = swim_comm->ping_targets;
494                 delete_head = 1;
495             } else {
496                 ping_list_prev->next = ping_list->next;
497                 ping_list->next = NULL;
498                 free(ping_list);
499                 ping_list = ping_list_prev;
500             }
501         }
502         ping_list_prev = ping_list;
503         if (ping_list != NULL && delete_head != 1) ping_list = ping_list->next;
504         delete_head = 0;
505     }
506
507     return 0;
508 }
509
510
511 /** Handle SWIM packets received*/
512 int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
513     swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
514
515     if (msg->type == ACK) {
516         /* If ACK was received then reset the awol count */
517         if (msg->seqno == remote->outgoing.next_seqno - 1) {
518             /* Ack for most recent message.  Clear saved state. */
519             remote->outgoing.first_seqno = remote->outgoing.next_seqno;
520             remote->outgoing.saved_value = 0;
521             remote->outgoing.saved_weight = 0;
522
523             remote->awol = 0;
524             remote->count_awol = 0;
525         }
526         /* Ignore ack if it isn't for most recent message. */
527     } else if (msg->type == MSG) {
528         if (msg->min_seqno > remote->incoming.seen_seqno) {
529             /* Entirely new information */
530             remote->incoming.seen_seqno = msg->seqno;
531             remote->incoming.saved_value = msg->value;
532             remote->incoming.saved_weight = msg->weight;
533             comm->gossip.value += msg->value;
534             comm->gossip.weight += msg->weight;
535             send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
536             remote->awol = 0;
537             remote->count_rounds = 0;
538             remote->count_awol = 0;
539             remote->count_alive = 0;
540
541             // check if there is an update piggy backed on this message
542             // if yes then add it to the update list
543             if(msg->update_present > 0) {
544                 update_t *new_update = (update_t *) malloc(sizeof(update_t));
545                 memset(new_update, 0, sizeof(update_t));
546                 // look for the remote limiter about whom update is being
547                 // sent the update node is sent in the message, msg->node!
548                 remote_limiter_t *temp_remote = map_search(comm->remote_node_map, &msg->node, sizeof(remote_node_t));
549                 // an update about the node itself is possible in which case map_search would fail
550                 if(temp_remote != NULL) {
551                     printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update says %d\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr), msg->reachability);
552                     new_update->remote = temp_remote;
553                     new_update->count = 0;
554                     if(msg->incarnation > new_update->remote->incarnation) {
555                         printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update is about new incarnation\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
556                         new_update->remote->reachability = msg->reachability;
557                         new_update->remote->incarnation = msg->incarnation;
558                         if(find_and_update(swim_comm->updates, new_update) == 0) {
559                             swim_comm->updates = add_to_list(swim_comm->updates, new_update);
560                             swim_comm->count_updates++;
561                         }
562                     } else if(msg->incarnation == new_update->remote->incarnation && new_update->remote->reachability == REACHABLE && msg->reachability == UNREACHABLE) {
563                         printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update about same incarnation, says node unreachable\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
564                         new_update->remote->reachability = msg->reachability;
565                         if(find_and_update(swim_comm->updates, new_update) == 0) {
566                             swim_comm->updates = add_to_list(swim_comm->updates, new_update);
567                             swim_comm->count_updates++;
568                         } else {
569                             // Ignore the update
570                             printlog(LOG_DEBUG, "SWIM: INFECT: update about %s ignored\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
571                         }
572                     }
573                 }
574             }
575         } 
576         else if (msg->seqno > remote->incoming.seen_seqno) {
577             /* Only some of the message is old news. */
578             double diff_value = msg->value - remote->incoming.saved_value;
579             double diff_weight = msg->weight - remote->incoming.saved_weight;
580
581             remote->incoming.seen_seqno = msg->seqno;
582             remote->incoming.saved_value = msg->value;
583             remote->incoming.saved_weight = msg->weight;
584
585             comm->gossip.value += diff_value;
586             comm->gossip.weight += diff_weight;
587             send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
588             remote->awol = 0;
589             remote->count_awol = 0;
590         } 
591         else {
592             /* The entire message is old news. (Duplicate). */
593             /* Do nothing. */
594         }
595         // Hearing from a node previously declared unreachable
596         if(remote->reachability == UNREACHABLE) {
597             printlog(LOG_DEBUG, "SWIM: INFECT: seems like %s is back up\n", inet_ntoa(*(struct in_addr *)&remote->addr));
598             remote->reachability = SUSPECT;
599             remote->awol = GOSSIP_REMOTE_AWOL_THRESHOLD;
600             remote->count_rounds = 0;
601             remote->count_awol = 0;
602             remote->count_alive = 0;
603         }
604     }
605     else if(msg->type == CHECK) {
606         swim_receive_check(comm, sock, remote, msg);
607     }
608     else if(msg->type == PING ) {
609         swim_receive_ping(comm, sock, remote, msg);
610     }
611     else if(msg->type == PING_ACK) {
612         swim_receive_pingack(comm, sock, remote, msg);
613     }
614     else if(msg->type == CHECK_ACK) {
615         swim_receive_checkack(comm, sock, remote, msg);
616     }
617     return 0;
618 }
619
620 int send_gossip_swim(comm_t *comm, uint32_t id, int sock) {
621     int i, j;
622     int retry_index = 0;
623     int result = 0;
624     remote_limiter_t *remote;
625     struct sockaddr_in toaddr;
626     double msg_value, msg_weight;
627
628     /* This is the factor for the portion of value/weight to keep locally.
629      * Normally this is 1, meaning that we retain the same amount of value/weight
630      * that was sent to the peers.  In the case of not being able to send to a
631      * peer though, we increment this to reclaim the value/weight locally. */
632     int message_portion = 1;
633
634     memset(&toaddr, 0, sizeof(struct sockaddr_in));
635     toaddr.sin_family = AF_INET;
636
637     msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
638     msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
639
640     /*Nodes to which message was sent will have a non-zero here*/
641     /*    for (i = 0; i < comm->remote_node_count; i++) {
642           if(comm->remote_limiters[i].awol > 0)
643           comm->remote_limiters[i].awol++;
644           }*/
645
646     for (i = 0; i < comm->remote_node_count; i++) {
647         printlog(LOG_DEBUG, "SWIM: STATUS: Node: %s reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);
648     }
649
650     for (i = 0; i < comm->gossip.gossip_branch; ++i) {
651         int targetid = NULL_PEER;
652         int rand_count = 0;
653         message_t msg;
654
655         printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
656
657         /* If there are any peers with unacked messages, select them first. */
658         while (retry_index < comm->remote_node_count) {
659             if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
660                 targetid = retry_index;
661                 printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
662                 retry_index += 1;
663                 break;
664             }
665             retry_index += 1;
666         }
667
668         while (targetid == NULL_PEER && rand_count < 50) {
669             /* *Gossip node would be chosen from
670             * the array which would be randomly shuffled
671             * once all the nodes have been sent messages*
672             */
673             targetid = find_gossip_target(comm);
674             /* If we didn't find any peers the needed retransmissions, select one randomly here. */
675 /*                targetid = myrand() % comm->remote_node_count;
676                 rand_count += 1;
677 */
678             /* Don't select an already-used index. */
679             for (j = 0; j < i; ++j) {
680                 if (targetid == comm->selected[j]) {
681                     printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  selected[j=%d] is %d\n", targetid, j, comm->selected[j]); 
682                     targetid = NULL_PEER;
683                     break;
684                 }
685             }
686             if (targetid != NULL_PEER) {
687                 if(comm->remote_limiters[targetid].reachability != REACHABLE) {
688                     printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  reachability is %d, and remote awol count is %d\n",
689                             targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].awol);
690                     targetid = NULL_PEER;
691                 }
692             }
693             
694             rand_count++;
695         }
696
697         if (targetid < 0) {
698             /* Couldn't find a suitable peer to send to... */
699             message_portion += 1;
700             printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
701             continue;
702         } else {
703             printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
704         }
705
706         remote = &comm->remote_limiters[targetid];
707         comm->selected[i] = targetid;
708
709         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
710         toaddr.sin_port = remote->port;
711
712         memset(&msg, 0, sizeof(message_t));
713         msg.magic = MAGIC_MSG;
714         msg.ident_id = id;
715         msg.value = msg_value + remote->outgoing.saved_value;
716         msg.weight = msg_weight + remote->outgoing.saved_weight;
717         msg.seqno = remote->outgoing.next_seqno;
718         msg.min_seqno = remote->outgoing.first_seqno;
719         msg.type = MSG;
720         msg.view = comm->gossip.view;
721         if(comm->gossip.membership == SWIM) {
722             swim_comm_t *swim_comm = (swim_comm_t *)comm->membership_state;
723             // piggy back an update
724             if(swim_comm->count_updates > 0) {
725                 int index = myrand() % swim_comm->count_updates;
726                 update_t *pointer = swim_comm->updates;
727                 update_t *prev_pointer = swim_comm->updates;
728                 while(index != 0) {
729                     prev_pointer = pointer;
730                     pointer = pointer->next;
731                     index--;
732                 }
733                 msg.update_present = TRUE;
734                 msg.reachability = pointer->remote->reachability;
735                 msg.incarnation = pointer->remote->incarnation;
736                 msg.node.addr = pointer->remote->addr;
737                 msg.node.port = pointer->remote->port;
738                 printlog(LOG_DEBUG, "SWIM: Sending update about %s\n", inet_ntoa(*(struct in_addr *)&pointer->remote->addr));
739                 pointer->count++;
740                 if(swim_comm->updates != pointer && pointer->count == UPDATE_THRESHOLD) {
741                     // pointer is not head
742                     prev_pointer->next = pointer->next;
743                     pointer->next = NULL;
744                     free(pointer);
745                     swim_comm->count_updates--;
746                 } else if(pointer->count == UPDATE_THRESHOLD) {
747                     // pointer is head of the list
748                     swim_comm->updates = pointer->next;
749                     pointer->next = NULL;
750                     free(pointer);
751                     swim_comm->count_updates--;
752                     if(swim_comm->count_updates == 0) {
753                         swim_comm->updates = NULL;
754                     }
755                 }
756             }
757         }
758         remote->outgoing.next_seqno++;
759         remote->outgoing.saved_value += msg_value;
760         remote->outgoing.saved_weight += msg_weight;
761         /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
762         remote->awol += 1;
763
764
765 #ifdef ALLOW_PARTITION
766
767         if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
768             printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
769             continue;
770         }
771
772 #endif
773
774         message_to_nbo(&msg);
775
776         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
777             printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
778             result = errno;
779             break;
780         }
781         printlog(LOG_DEBUG,"SWIM: sent the gossip to %s\n", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr));
782     }
783     swim_send(comm, id, sock);
784     comm->gossip.value = msg_value * message_portion;
785     comm->gossip.weight = msg_weight * message_portion;
786
787     return result;
788 }
789
790 void swim_restart(comm_t *comm, int32_t view_number) {
791     /* Not sure about this yet... */
792 }
793
794 int swim_init(comm_t *comm, uint32_t id) {
795     comm->membership_state = malloc(sizeof(swim_comm_t));
796     if (comm->membership_state == NULL) {
797         return ENOMEM;
798     }
799     comm->connected = 1;
800     comm->recv_function = swim_receive;
801     comm->send_function = send_gossip_swim;
802     comm->restart_function = swim_restart;
803
804     swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
805
806     FILE *fp = fopen("/root/incarnation", "w+");
807     fscanf(fp, "%d", &swim_comm->incarnation);
808     fprintf(fp, "%d", swim_comm->incarnation + 1); // next time a greater incarnation would be read
809     fflush(fp);
810     fclose(fp);
811
812     return 0;
813 }
814
815 void swim_teardown(comm_t *comm) {
816     if (comm->membership_state)
817         free(comm->membership_state);
818 }
819
820 /**Bhanu: new functions being introduced*/
821 /** Given a friend_id and the address of the suspected node
822  * this function sends a CHECK message to friend */
823 /*
824    int help_from_friend(comm_t* comm, int friend_id, in_addr_t addr, in_port_t port, uint32_t id, int sock) {
825 // send a CHECK message to friend
826 int result;
827 message_t msg;
828 memset(&msg, 0, sizeof(message_t));
829 msg.magic = MAGIC_MSG;
830 msg.ident_id = id;
831 msg.value = 0;
832 msg.weight = 0;
833 msg.seqno = 0;
834 msg.min_seqno = 0;
835 msg.type = CHECK;
836 msg.check_target = addr;
837 msg.check_port = port;
838
839 // send the message
840 struct sockaddr_in toaddr;
841 memset(&toaddr, 0, sizeof(sockaddr_in));
842 toaddr.sin_family = AF_INET;
843 toaddr.sin_addr.s_addr = comm->remote_limiters[friend_id].addr;
844 toaddr.sin_port = comm->remote_limiters[friend_id].port;
845 message_to_nbo(&msg);
846
847 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
848 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
849 result = errno;
850 printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
851 }
852 }
853 */