1 /* See the DRL-LICENSE file for this file's software license. */
9 /* Socket functions. */
10 #include <sys/types.h>
11 #include <sys/socket.h>
13 /* Byte ordering and address structures. */
14 #include <arpa/inet.h>
19 #include "raterouter.h"
20 #include "ratetypes.h"
21 #include "drl_state.h"
22 #include "peer_comm.h"
26 /* From ulogd_DRL.c */
27 extern int do_partition;
28 extern int partition_set;
30 extern limiter_t limiter;
32 /**Finds the update, if found then frees the memory of the new_update
33 * and returns 1. If find fails then this returns 0*/
34 static int find_and_update(update_t *updates, update_t *new_update) {
35 if( updates == NULL ) {
36 printlog(LOG_DEBUG, "SWIM: INFECT: no existing updates\n");
39 update_t *pointer = updates;
40 while(pointer != NULL) {
41 if(pointer->remote == new_update->remote && pointer->remote->incarnation >= new_update->remote->incarnation) {
43 printlog(LOG_DEBUG, "SWIM: INFECT: update already exists\n");
47 pointer = pointer->next;
49 printlog(LOG_DEBUG, "SWIM: INFECT: update not found among existing updates\n");
53 /*Just adds to the end of list and returns the head*/
54 update_t *add_to_list(update_t *updates, update_t *new_update) {
55 printlog(LOG_DEBUG, "SWIM: INFECT: adding to list of updates: %s is %d\n", inet_ntoa(*(struct in_addr *)&new_update->remote->addr), new_update->remote->reachability);
56 update_t *head = updates;
62 while(pointer->next != NULL) {
63 pointer = pointer->next;
65 pointer->next = new_update;
70 /** Given the address of the suspected node this function
71 * identifies friends who can probe the suspected node
72 * After recording these, the messages for help are sent
73 * in the next gossip round*/
74 static int help_from_friends(comm_t *comm, int suspect_index, uint32_t id, int sock) {
75 printlog(LOG_DEBUG,"SWIM: In function help_from_friends suspected node: %s, index: %d\n",inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
78 int count_friends = (comm->remote_node_count > MAX_FRIENDS ) ? MAX_FRIENDS : comm->remote_node_count; // A more logical way?
79 // remote_node_t friend_nodes[count_friends];
80 // int friend_ids[count_friends];
82 while( (i - j) < count_friends && i < comm->remote_node_count ) {
83 // Do not pick a friend who is suspected to be down
84 if (comm->remote_limiters[i].reachability != REACHABLE) {
88 /**construct the message and send it to friend
91 memset(&check_msg, 0, sizeof(message_t));
92 check_msg.magic = MAGIC_MSG;
93 check_msg.ident_id = id;
97 check_msg.min_seqno = 0;
98 check_msg.type = CHECK;
99 check_msg.check_target = comm->remote_limiters[suspect_index].addr;
100 check_msg.check_port = comm->remote_limiters[suspect_index].port;
103 struct sockaddr_in toaddr;
104 memset(&toaddr, 0, sizeof(struct sockaddr_in));
105 toaddr.sin_family = AF_INET;
106 toaddr.sin_addr.s_addr = comm->remote_limiters[i].addr;
107 toaddr.sin_port = comm->remote_limiters[i].port;
108 message_to_nbo(&check_msg);
110 printlog(LOG_DEBUG,"SWIM: Sending CHECK message to friend %s i: %d", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr), i);
111 printlog(LOG_DEBUG," Suspect: %s", inet_ntoa(*(struct in_addr *)&check_msg.check_target));
112 printlog(LOG_DEBUG," Initial: %s suspect_index: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
113 if (sendto(sock, &check_msg, sizeof(check_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
114 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
116 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
120 printlog(LOG_DEBUG,"SWIM: Out function help_from_friends.\n");
124 /** Receiving CHECK packet*/
125 static void swim_receive_check(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
127 #ifdef ALLOW_PARTITION
129 for(id = 0; id < comm->remote_node_count; id++) {
130 if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
131 if (do_partition && ((partition_set & (1 << id)) == 0)) {
132 printlog(LOG_DEBUG, "SWIM: Ignoring CHECK message from %d\n", id);
140 if(remote->reachability != REACHABLE)
143 swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
145 // create the message that has to be sent to the suspected node
146 printlog(LOG_DEBUG,"SWIM: received CHECK message from %s", inet_ntoa(*(struct in_addr *)&remote->addr));
147 printlog(LOG_DEBUG,", sending PING to %s\n", inet_ntoa(*(struct in_addr *)&msg->check_target));
150 memset(&ping_msg, 0, sizeof(message_t));
151 ping_msg.magic = MAGIC_MSG;
152 ping_msg.ident_id = msg->ident_id;
156 ping_msg.min_seqno = 0;
157 ping_msg.type = PING;
158 ping_msg.ping_source = remote->addr;
159 ping_msg.ping_port = remote->port;
160 // send the ping message
161 struct sockaddr_in toaddr;
162 memset(&toaddr, 0, sizeof(struct sockaddr_in));
163 toaddr.sin_family = AF_INET;
164 toaddr.sin_addr.s_addr = msg->check_target;
165 toaddr.sin_port = msg->check_port;
166 message_to_nbo(&ping_msg);
168 /** add to ping targets before sending message */
169 ping_target_t *suspect = (ping_target_t*) malloc(sizeof(ping_target_t));
170 memset(suspect, 0, sizeof(ping_target_t));
171 suspect->target.addr = msg->check_target;
172 suspect->target.port = msg->check_port;
173 suspect->source.addr = remote->addr;
174 suspect->source.port = remote->port;
175 printlog(LOG_DEBUG, "SWIM: adding %s to PING list\n", inet_ntoa(*(struct in_addr *)&suspect->target.addr));
177 ping_target_t *pointer = swim_comm->ping_targets;
178 if( swim_comm->ping_targets != NULL ) {
179 while(pointer->next != NULL) {
180 pointer = pointer->next;
182 pointer->next = suspect;
185 swim_comm->ping_targets = suspect;
187 /** added to the end of the list of ping_targets */
189 if (sendto(limiter.udp_socket, &ping_msg, sizeof(ping_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
190 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
192 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
194 printlog(LOG_DEBUG,"SWIM: Sent PING message\n");
196 printlog(LOG_DEBUG,"SWIM: Processed CHECK packet\n");
200 /** Receiving PING packet*/
201 static void swim_receive_ping(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
203 #ifdef ALLOW_PARTITION
205 for(id = 0; id < comm->remote_node_count; id++) {
206 if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
207 if (do_partition && ((partition_set & (1 << id)) == 0)) {
208 printlog(LOG_DEBUG, "SWIM: Ignoring PING message from %d\n", id);
215 printlog(LOG_DEBUG,"SWIM: receiving the PING message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
218 if(remote->reachability != REACHABLE)
222 swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
224 message_t pingack_msg;
225 memset(&pingack_msg, 0, sizeof(message_t));
226 pingack_msg.magic = MAGIC_MSG;
227 pingack_msg.ident_id = msg->ident_id;
228 pingack_msg.value = 0;
229 pingack_msg.weight = 0;
230 pingack_msg.seqno = 0;
231 pingack_msg.min_seqno = 0;
232 pingack_msg.type = PING_ACK;
234 swim_comm->incarnation++;
235 pingack_msg.update_present = TRUE;
236 pingack_msg.reachability = REACHABLE;
237 pingack_msg.incarnation = swim_comm->incarnation;
238 FILE *fp = fopen("/root/incarnation", "w+");
239 fprintf(fp, "%d", swim_comm->incarnation + 1);
243 struct sockaddr_in toaddr;
244 memset(&toaddr, 0, sizeof(struct sockaddr_in));
245 toaddr.sin_family = AF_INET;
246 toaddr.sin_addr.s_addr = remote->addr;
247 toaddr.sin_port = remote->port;
248 message_to_nbo(&pingack_msg);
250 if (sendto(limiter.udp_socket, &pingack_msg, sizeof(pingack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
251 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
253 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
255 printlog(LOG_DEBUG, "SWIM: sent PING_ACK to %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
257 printlog(LOG_DEBUG,"SWIM: Processed PING packet\n");
261 /** Receiving PING_ACK packet*/
262 static void swim_receive_pingack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
263 // find the source which requested this ping and inform it with CHECK_ACK, ALIVE
264 // look up in the ping_targets list.
265 printlog(LOG_DEBUG, "SWIM: receiving the PING_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
269 ping_target_t* pointer;
270 ping_target_t* prev_pointer;
271 swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
272 pointer = swim_comm->ping_targets;
273 prev_pointer = swim_comm->ping_targets;
276 * Removed this because if a PING_ACK arrives then CHECK_ACK would be
277 * sent to all the nodes which requested a check on this node. Hence pointer
278 * could be NULL but we could receive a CHECK_ACK packet
279 * if (pointer == NULL) {
280 printlog(LOG_DEBUG, "SWIM: Received PING_ACK for a PING not sent\n");
284 while(pointer!=NULL) {
285 if(pointer->target.addr == remote->addr && pointer->target.port == remote->port) {
286 // suspect has been found in the list
287 // now construct the CHECK_ACK message and send it to source
288 message_t checkack_msg;
289 memset(&checkack_msg, 0, sizeof(message_t));
290 checkack_msg.magic = MAGIC_MSG;
291 checkack_msg.ident_id = msg->ident_id;
292 checkack_msg.value = 0;
293 checkack_msg.weight = 0;
294 checkack_msg.seqno = 0;
295 checkack_msg.min_seqno = 0;
296 checkack_msg.type = CHECK_ACK;
297 checkack_msg.checkack_value = ALIVE;
298 // inform the source of the addr and port of suspected node
299 checkack_msg.check_target = remote->addr;
300 checkack_msg.check_port = remote->port;
301 struct sockaddr_in toaddr;
302 memset(&toaddr, 0, sizeof(struct sockaddr_in));
304 toaddr.sin_family = AF_INET;
305 toaddr.sin_addr.s_addr = pointer->source.addr;
306 toaddr.sin_port = pointer->source.port;
307 message_to_nbo(&checkack_msg);
309 if (sendto(limiter.udp_socket, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
310 printlog(LOG_WARN, "WARN: swim_receive_pingack : sento failed.\n");
312 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
315 /** Now delete this suspect from friends list of nodes*/
316 if(prev_pointer == pointer) {
317 swim_comm->ping_targets = pointer->next;
318 pointer->next = NULL;
320 pointer = swim_comm->ping_targets;
323 prev_pointer->next = pointer->next;
324 pointer->next = NULL;
326 pointer = prev_pointer;
330 prev_pointer = pointer;
331 if(pointer != NULL && delete_head != 1) {
332 pointer = pointer->next;
335 printf("SWIM: PING ACK\n");
337 // PING_ACK has been received then add to the list of updates
338 remote_node_t sender;
339 memset(&sender, 0, sizeof(remote_node_t));
340 sender.addr = remote->addr;
341 sender.port = remote->port;
342 update_t* new_update = (update_t *) malloc(sizeof(update_t));
343 memset(new_update, 0, sizeof(update_t));
344 new_update->remote = map_search(comm->remote_node_map, &sender, sizeof(remote_node_t));
345 if(new_update->remote == NULL) {
346 printlog(LOG_DEBUG, "SWIM: PANIC: PING_ACK received from an unknown node %s\n",inet_ntoa(*(struct in_addr *)&sender.addr));
348 new_update->count = 0;
350 if(msg->incarnation > new_update->remote->incarnation) {
351 new_update->remote->incarnation = msg->incarnation;
352 new_update->remote->reachability = REACHABLE;
353 new_update->remote->awol = 0;
354 new_update->remote->count_rounds = 0;
355 new_update->remote->count_awol = 0;
356 new_update->remote->count_alive = 0;
357 if( find_and_update(swim_comm->updates, new_update) == 0 ) {
358 swim_comm->updates = add_to_list(swim_comm->updates, new_update);
359 swim_comm->count_updates++;
361 } else if(msg->incarnation == new_update->remote->incarnation) {
362 // if the node previously thought that sender was down then it prevails
363 // else if the node thought it was up then there is no change
366 if(confirm != 1) printlog(LOG_DEBUG,"SWIM: PING_ACK did not match entries in list\n");
367 printlog(LOG_DEBUG,"SWIM: Processed PING_ACK packet\n");
371 /** Receiving CHECK_ACK packet*/
372 static void swim_receive_checkack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
373 printlog(LOG_DEBUG, "SWIM: receiving the CHECK_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
375 for( i = 0; i < comm->remote_node_count; i++) {
376 if(comm->remote_limiters[i].addr == msg->check_target && comm->remote_limiters[i].port == msg->check_port) {
377 if(msg->checkack_value == ALIVE)
378 comm->remote_limiters[i].count_alive++;
379 else if (msg->checkack_value == AWOL)
380 comm->remote_limiters[i].count_awol++;
383 printlog(LOG_DEBUG,"SWIM: Processed CHECK_ACK packet\n");
387 static int swim_send(comm_t *comm, int id, int sock) {
389 swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
391 /**SOURCE: Send messages to friends to check on
392 * nodes which are suspected to be down*/
393 for(i = 0; i < comm->remote_node_count; i++) {
394 printlog(LOG_DEBUG, "SWIM: AWOL count of %s is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].awol);
395 if(comm->remote_limiters[i].awol == GOSSIP_REMOTE_AWOL_THRESHOLD) {
396 // HACK to make sure this part of code is entered only once
397 comm->remote_limiters[i].reachability = SUSPECT;
398 comm->remote_limiters[i].awol++;
399 help_from_friends(comm, i, id, sock);
403 /**SOURCE: Count number of rounds since the node has been suspected
404 * If in this process the count reaches threshold then take action
406 for (i = 0; i < comm->remote_node_count; i++) {
407 if(comm->remote_limiters[i].reachability == SUSPECT) {
408 comm->remote_limiters[i].count_rounds++;
409 printlog(LOG_DEBUG, "SWIM: ROUNDS count on %s index %d is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), i, comm->remote_limiters[i].count_rounds);
410 if(comm->remote_limiters[i].count_rounds > SOURCE_THRESHOLD) {
411 if(comm->remote_limiters[i].count_alive > 0) {
412 printlog(LOG_DEBUG,"SWIM: the node %s was up, wrongly suspected\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
413 comm->remote_limiters[i].reachability = REACHABLE;
414 comm->remote_limiters[i].count_rounds = 0;
415 comm->remote_limiters[i].count_awol = 0;
416 comm->remote_limiters[i].count_alive = 0;
418 comm->remote_limiters[i].awol = 0;
420 else if (comm->remote_limiters[i].count_awol > 0) {
421 comm->remote_limiters[i].reachability = UNREACHABLE;
422 update_t* new_update = (update_t *) malloc(sizeof(update_t));
423 memset(new_update, 0, sizeof(update_t));
424 new_update->remote = &comm->remote_limiters[i];
425 new_update->count = 0;
426 // comm->remote_limiters[i].incoming.seen_seqno = 0;
427 printlog(LOG_DEBUG, "SWIM: INFECT: Update down information %s\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
428 if(find_and_update(swim_comm->updates, new_update) == 0) {
429 swim_comm->updates = add_to_list(swim_comm->updates, new_update);
430 swim_comm->count_updates++;
432 printlog(LOG_DEBUG,"SWIM: The node %s is down. reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);// The node is check_list.target
435 /**Even friends have not responded, request for help from more friends?*/
436 printlog(LOG_DEBUG,"SWIM: Last ditch effort, even friends did not respond\n");
437 comm->remote_limiters[i].reachability = SUSPECT;
438 comm->remote_limiters[i].count_rounds = 0; // CHECK
439 help_from_friends(comm, i, id, sock);
445 /**Actions performed by "FRIEND"*/
447 ping_target_t* ping_list = swim_comm->ping_targets;
448 while(ping_list != NULL) {
449 printlog(LOG_DEBUG, "SWIM: in PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
450 ping_list = ping_list->next;
452 ping_list = swim_comm->ping_targets;
453 ping_target_t* ping_list_prev = swim_comm->ping_targets;
455 while(ping_list != NULL) {
457 printlog(LOG_DEBUG,"SWIM: friend keeping track of gossip rounds since PING.\n");
458 // if in this process some node hits threshold then
459 // send AWOL and delete it from this list
460 if(ping_list->count >= FRIEND_THRESHOLD) {
461 printlog(LOG_DEBUG,"SWIM: friend declaring AWOL.\n");
462 message_t checkack_msg;
463 memset(&checkack_msg, 0, sizeof(message_t));
464 checkack_msg.magic = MAGIC_MSG;
465 checkack_msg.ident_id = id;
466 checkack_msg.value = 0;
467 checkack_msg.weight = 0;
468 checkack_msg.seqno = 0;
469 checkack_msg.min_seqno = 0;
470 checkack_msg.type = CHECK_ACK;
471 checkack_msg.checkack_value = AWOL;
472 // inform the source of the addr and port of suspected node
473 checkack_msg.check_target = ping_list->target.addr;
474 checkack_msg.check_port = ping_list->target.port;
475 struct sockaddr_in toaddr;
476 memset(&toaddr, 0, sizeof(struct sockaddr_in));
477 toaddr.sin_family = AF_INET;
478 toaddr.sin_addr.s_addr = ping_list->source.addr;
479 toaddr.sin_port = ping_list->source.port;
480 message_to_nbo(&checkack_msg);
482 if (sendto(sock, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
483 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
485 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
487 // now the deletion: after deletion we want to continue from next node
488 printlog(LOG_DEBUG, "SWIM: deleting from PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
489 if(ping_list_prev == ping_list) {
490 swim_comm->ping_targets = ping_list->next;
491 ping_list->next = NULL;
493 ping_list = swim_comm->ping_targets;
496 ping_list_prev->next = ping_list->next;
497 ping_list->next = NULL;
499 ping_list = ping_list_prev;
502 ping_list_prev = ping_list;
503 if (ping_list != NULL && delete_head != 1) ping_list = ping_list->next;
511 /** Handle SWIM packets received*/
512 int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
513 swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
515 if (msg->type == ACK) {
516 /* If ACK was received then reset the awol count */
517 if (msg->seqno == remote->outgoing.next_seqno - 1) {
518 /* Ack for most recent message. Clear saved state. */
519 remote->outgoing.first_seqno = remote->outgoing.next_seqno;
520 remote->outgoing.saved_value = 0;
521 remote->outgoing.saved_weight = 0;
524 remote->count_awol = 0;
526 /* Ignore ack if it isn't for most recent message. */
527 } else if (msg->type == MSG) {
528 if (msg->min_seqno > remote->incoming.seen_seqno) {
529 /* Entirely new information */
530 remote->incoming.seen_seqno = msg->seqno;
531 remote->incoming.saved_value = msg->value;
532 remote->incoming.saved_weight = msg->weight;
533 comm->gossip.value += msg->value;
534 comm->gossip.weight += msg->weight;
535 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
537 remote->count_rounds = 0;
538 remote->count_awol = 0;
539 remote->count_alive = 0;
541 // check if there is an update piggy backed on this message
542 // if yes then add it to the update list
543 if(msg->update_present > 0) {
544 update_t *new_update = (update_t *) malloc(sizeof(update_t));
545 memset(new_update, 0, sizeof(update_t));
546 // look for the remote limiter about whom update is being
547 // sent the update node is sent in the message, msg->node!
548 remote_limiter_t *temp_remote = map_search(comm->remote_node_map, &msg->node, sizeof(remote_node_t));
549 // an update about the node itself is possible in which case map_search would fail
550 if(temp_remote != NULL) {
551 printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update says %d\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr), msg->reachability);
552 new_update->remote = temp_remote;
553 new_update->count = 0;
554 if(msg->incarnation > new_update->remote->incarnation) {
555 printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update is about new incarnation\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
556 new_update->remote->reachability = msg->reachability;
557 new_update->remote->incarnation = msg->incarnation;
558 if(find_and_update(swim_comm->updates, new_update) == 0) {
559 swim_comm->updates = add_to_list(swim_comm->updates, new_update);
560 swim_comm->count_updates++;
562 } else if(msg->incarnation == new_update->remote->incarnation && new_update->remote->reachability == REACHABLE && msg->reachability == UNREACHABLE) {
563 printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update about same incarnation, says node unreachable\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
564 new_update->remote->reachability = msg->reachability;
565 if(find_and_update(swim_comm->updates, new_update) == 0) {
566 swim_comm->updates = add_to_list(swim_comm->updates, new_update);
567 swim_comm->count_updates++;
570 printlog(LOG_DEBUG, "SWIM: INFECT: update about %s ignored\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
576 else if (msg->seqno > remote->incoming.seen_seqno) {
577 /* Only some of the message is old news. */
578 double diff_value = msg->value - remote->incoming.saved_value;
579 double diff_weight = msg->weight - remote->incoming.saved_weight;
581 remote->incoming.seen_seqno = msg->seqno;
582 remote->incoming.saved_value = msg->value;
583 remote->incoming.saved_weight = msg->weight;
585 comm->gossip.value += diff_value;
586 comm->gossip.weight += diff_weight;
587 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
589 remote->count_awol = 0;
592 /* The entire message is old news. (Duplicate). */
595 // Hearing from a node previously declared unreachable
596 if(remote->reachability == UNREACHABLE) {
597 printlog(LOG_DEBUG, "SWIM: INFECT: seems like %s is back up\n", inet_ntoa(*(struct in_addr *)&remote->addr));
598 remote->reachability = SUSPECT;
599 remote->awol = GOSSIP_REMOTE_AWOL_THRESHOLD;
600 remote->count_rounds = 0;
601 remote->count_awol = 0;
602 remote->count_alive = 0;
605 else if(msg->type == CHECK) {
606 swim_receive_check(comm, sock, remote, msg);
608 else if(msg->type == PING ) {
609 swim_receive_ping(comm, sock, remote, msg);
611 else if(msg->type == PING_ACK) {
612 swim_receive_pingack(comm, sock, remote, msg);
614 else if(msg->type == CHECK_ACK) {
615 swim_receive_checkack(comm, sock, remote, msg);
620 int send_gossip_swim(comm_t *comm, uint32_t id, int sock) {
624 remote_limiter_t *remote;
625 struct sockaddr_in toaddr;
626 double msg_value, msg_weight;
628 /* This is the factor for the portion of value/weight to keep locally.
629 * Normally this is 1, meaning that we retain the same amount of value/weight
630 * that was sent to the peers. In the case of not being able to send to a
631 * peer though, we increment this to reclaim the value/weight locally. */
632 int message_portion = 1;
634 memset(&toaddr, 0, sizeof(struct sockaddr_in));
635 toaddr.sin_family = AF_INET;
637 msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
638 msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
640 /*Nodes to which message was sent will have a non-zero here*/
641 /* for (i = 0; i < comm->remote_node_count; i++) {
642 if(comm->remote_limiters[i].awol > 0)
643 comm->remote_limiters[i].awol++;
646 for (i = 0; i < comm->remote_node_count; i++) {
647 printlog(LOG_DEBUG, "SWIM: STATUS: Node: %s reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);
650 for (i = 0; i < comm->gossip.gossip_branch; ++i) {
651 int targetid = NULL_PEER;
655 printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
657 /* If there are any peers with unacked messages, select them first. */
658 while (retry_index < comm->remote_node_count) {
659 if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
660 targetid = retry_index;
661 printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
668 while (targetid == NULL_PEER && rand_count < 50) {
669 /* *Gossip node would be chosen from
670 * the array which would be randomly shuffled
671 * once all the nodes have been sent messages*
673 targetid = find_gossip_target(comm);
674 /* If we didn't find any peers the needed retransmissions, select one randomly here. */
675 /* targetid = myrand() % comm->remote_node_count;
678 /* Don't select an already-used index. */
679 for (j = 0; j < i; ++j) {
680 if (targetid == comm->selected[j]) {
681 printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]);
682 targetid = NULL_PEER;
686 if (targetid != NULL_PEER) {
687 if(comm->remote_limiters[targetid].reachability != REACHABLE) {
688 printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. reachability is %d, and remote awol count is %d\n",
689 targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].awol);
690 targetid = NULL_PEER;
698 /* Couldn't find a suitable peer to send to... */
699 message_portion += 1;
700 printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
703 printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
706 remote = &comm->remote_limiters[targetid];
707 comm->selected[i] = targetid;
709 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
710 toaddr.sin_port = remote->port;
712 memset(&msg, 0, sizeof(message_t));
713 msg.magic = MAGIC_MSG;
715 msg.value = msg_value + remote->outgoing.saved_value;
716 msg.weight = msg_weight + remote->outgoing.saved_weight;
717 msg.seqno = remote->outgoing.next_seqno;
718 msg.min_seqno = remote->outgoing.first_seqno;
720 msg.view = comm->gossip.view;
721 if(comm->gossip.membership == SWIM) {
722 swim_comm_t *swim_comm = (swim_comm_t *)comm->membership_state;
723 // piggy back an update
724 if(swim_comm->count_updates > 0) {
725 int index = myrand() % swim_comm->count_updates;
726 update_t *pointer = swim_comm->updates;
727 update_t *prev_pointer = swim_comm->updates;
729 prev_pointer = pointer;
730 pointer = pointer->next;
733 msg.update_present = TRUE;
734 msg.reachability = pointer->remote->reachability;
735 msg.incarnation = pointer->remote->incarnation;
736 msg.node.addr = pointer->remote->addr;
737 msg.node.port = pointer->remote->port;
738 printlog(LOG_DEBUG, "SWIM: Sending update about %s\n", inet_ntoa(*(struct in_addr *)&pointer->remote->addr));
740 if(swim_comm->updates != pointer && pointer->count == UPDATE_THRESHOLD) {
741 // pointer is not head
742 prev_pointer->next = pointer->next;
743 pointer->next = NULL;
745 swim_comm->count_updates--;
746 } else if(pointer->count == UPDATE_THRESHOLD) {
747 // pointer is head of the list
748 swim_comm->updates = pointer->next;
749 pointer->next = NULL;
751 swim_comm->count_updates--;
752 if(swim_comm->count_updates == 0) {
753 swim_comm->updates = NULL;
758 remote->outgoing.next_seqno++;
759 remote->outgoing.saved_value += msg_value;
760 remote->outgoing.saved_weight += msg_weight;
761 /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
765 #ifdef ALLOW_PARTITION
767 if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
768 printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
774 message_to_nbo(&msg);
776 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
777 printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
781 printlog(LOG_DEBUG,"SWIM: sent the gossip to %s\n", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr));
783 swim_send(comm, id, sock);
784 comm->gossip.value = msg_value * message_portion;
785 comm->gossip.weight = msg_weight * message_portion;
790 void swim_restart(comm_t *comm, int32_t view_number) {
791 /* Not sure about this yet... */
794 int swim_init(comm_t *comm, uint32_t id) {
795 comm->membership_state = malloc(sizeof(swim_comm_t));
796 if (comm->membership_state == NULL) {
800 comm->recv_function = swim_receive;
801 comm->send_function = send_gossip_swim;
802 comm->restart_function = swim_restart;
804 swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
806 FILE *fp = fopen("/root/incarnation", "w+");
807 fscanf(fp, "%d", &swim_comm->incarnation);
808 fprintf(fp, "%d", swim_comm->incarnation + 1); // next time a greater incarnation would be read
815 void swim_teardown(comm_t *comm) {
816 if (comm->membership_state)
817 free(comm->membership_state);
820 /**Bhanu: new functions being introduced*/
821 /** Given a friend_id and the address of the suspected node
822 * this function sends a CHECK message to friend */
824 int help_from_friend(comm_t* comm, int friend_id, in_addr_t addr, in_port_t port, uint32_t id, int sock) {
825 // send a CHECK message to friend
828 memset(&msg, 0, sizeof(message_t));
829 msg.magic = MAGIC_MSG;
836 msg.check_target = addr;
837 msg.check_port = port;
840 struct sockaddr_in toaddr;
841 memset(&toaddr, 0, sizeof(sockaddr_in));
842 toaddr.sin_family = AF_INET;
843 toaddr.sin_addr.s_addr = comm->remote_limiters[friend_id].addr;
844 toaddr.sin_port = comm->remote_limiters[friend_id].port;
845 message_to_nbo(&msg);
847 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
848 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
850 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));