Updates to autotools for library detection
[distributedratelimiting.git] / drl / peer_comm.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #define _XOPEN_SOURCE 600
4
5 /* Debug output. */
6 #include <stdio.h>
7 #include <stdlib.h>
8
9 /* Socket functions. */
10 #include <sys/types.h>
11 #include <sys/socket.h>
12
13 /* Byte ordering and address structures. */
14 #include <arpa/inet.h>
15
16 /* memset() */
17 #include <string.h>
18
19 /* close() & usleep */
20 #include <unistd.h>
21
22 /* Mutex lock/unlock. */
23 #include <pthread.h>
24
25 /* perror() */
26 #include <errno.h>
27
28 /* select() w/ timeout */
29 #include <sys/select.h>
30 #include <sys/time.h>
31
32 /* assert() */
33 #include <assert.h>
34
35 /* sigaddset(), sigemptyset(), SIGHUP, etc. */
36 #include <signal.h>
37
38 /* DRL data structures. */
39 #include "raterouter.h"
40 #include "ratetypes.h"
41 #include "drl_state.h"
42 #include "peer_comm.h"
43 #include "logging.h"
44
45 #define NULL_PEER (-2)
46 #define MESH_REMOTE_AWOL_THRESHOLD (5)
47 #define GOSSIP_REMOTE_AWOL_THRESHOLD (5)
48
49 /* From ulogd_DRL.c */
50 extern int do_partition;
51 extern int partition_set;
52
53 extern limiter_t limiter;
54
55 void message_to_hbo(message_t *msg) {
56     msg->magic = ntohl(msg->magic);
57     msg->ident_id = ntohl(msg->ident_id);
58     /* value is a double */
59     /* weight is a double */
60     msg->seqno = ntohl(msg->seqno);
61     msg->min_seqno = ntohl(msg->min_seqno);
62     msg->type = ntohs(msg->type);
63     /* ping_source, ping_port, check_target, and check_port stay in nbo. */
64     msg->checkack_value = ntohl(msg->checkack_value);
65     msg->update_present = ntohl(msg->update_present);
66     msg->reachability = ntohl(msg->reachability);
67     msg->incarnation = ntohl(msg->incarnation);
68     /* node has two fields, both stay in nbo. */
69     msg->view = ntohl(msg->view);
70 }
71
72 void message_to_nbo(message_t *msg) {
73     msg->magic = htonl(msg->magic);
74     msg->ident_id = htonl(msg->ident_id);
75     /* value is a double */
76     /* weight is a double */
77     msg->seqno = htonl(msg->seqno);
78     msg->min_seqno = htonl(msg->min_seqno);
79     msg->type = htons(msg->type);
80     /* ping_source, ping_port, check_target, and check_port already in nbo. */
81     msg->checkack_value = htonl(msg->checkack_value);
82     msg->update_present = htonl(msg->update_present);
83     msg->reachability = htonl(msg->reachability);
84     msg->incarnation = htonl(msg->incarnation);
85     /* node has two fields, both already in nbo. */
86     msg->view = htonl(msg->view);
87 }
88
89 int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view) {
90     int result = 0;
91     message_t msg;
92     struct sockaddr_in toaddr;
93
94     memset(&toaddr, 0, sizeof(struct sockaddr_in));
95     toaddr.sin_family = AF_INET;
96
97     toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
98     toaddr.sin_port = remote->port;
99
100     memset(&msg, 0, sizeof(msg));
101     msg.magic = MAGIC_MSG;
102     msg.ident_id = id;
103     msg.type = type;
104     msg.seqno = seqno;
105     msg.view = view;
106
107     message_to_nbo(&msg);
108
109     if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
110         printlog(LOG_WARN, "send_ack: sento failed.\n");
111         result = errno;
112     }
113
114     return result;
115 }
116
117 void limiter_receive() {
118     struct sockaddr_in fromaddr;
119     remote_node_t sender;
120     socklen_t fromlen = sizeof(fromaddr);
121     identity_t *ident = NULL;
122     remote_limiter_t *remote = NULL;
123     message_t msg;
124
125     if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) {
126         /* recv failed.  Log and continue. */
127         printlog(LOG_WARN, "recv failed to read full message.\n");
128         return;
129     }
130     memset(&sender, 0, sizeof(remote_node_t));
131     sender.addr = fromaddr.sin_addr.s_addr;
132     sender.port = fromaddr.sin_port;
133
134     message_to_hbo(&msg);
135
136     assert(msg.magic == MAGIC_MSG);
137
138 #if 0
139     printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n", 
140             msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t));
141 #endif
142     pthread_testcancel();
143
144     pthread_rwlock_rdlock(&limiter.limiter_lock);
145
146     ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id,
147                        sizeof(msg.ident_id));
148
149     if (ident == NULL) {
150         printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n");
151         pthread_rwlock_unlock(&limiter.limiter_lock);
152         return;
153     }
154
155     pthread_mutex_lock(&ident->comm.lock);
156
157     remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t));
158
159     if (remote == NULL) {
160         printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n");
161         pthread_mutex_unlock(&ident->comm.lock);
162         pthread_rwlock_unlock(&limiter.limiter_lock);
163         return;
164     }
165
166     /* Pass the message to the comm's recv function, which is responsible for
167      * processing its contents. */
168     ident->comm.recv_function(&ident->comm, ident->id, limiter.udp_socket, remote, &msg);
169     
170     pthread_mutex_unlock(&ident->comm.lock);
171     pthread_rwlock_unlock(&limiter.limiter_lock);
172 }
173
174 int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
175     /* Use the message's value to be our new GRDrate/FPSweight for the
176      * message's sender. */
177     remote->rate = msg->value;
178
179     /* Reset the AWOL counter to zero since we received an update. */
180     remote->awol = 0;
181     remote->reachability = REACHABLE;
182
183     return 0;
184 }
185
186 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
187     int result = 0;
188     remote_limiter_t *remote;
189     message_t msg;
190     struct sockaddr_in toaddr;
191     int i;
192
193 #ifdef ALLOW_PARTITION
194
195     int partition_count = 0;
196     struct in_addr dest;
197     char dest_ip[22];
198
199 #endif
200
201     memset(&toaddr, 0, sizeof(struct sockaddr_in));
202     toaddr.sin_family = AF_INET;
203
204     memset(&msg, 0, sizeof(message_t));
205     msg.magic = MAGIC_MSG;
206     msg.ident_id = id;
207     msg.value = comm->local_rate;
208     msg.view = comm->gossip.view;
209     /* Do we want seqnos for mesh?  We can get by without them. */
210
211     message_to_nbo(&msg);
212
213     /* Iterate though and send update to all remote limiters in our identity. */
214     for (i = 0; i < comm->remote_node_count; ++i) {
215         remote = &comm->remote_limiters[i];
216
217         /* Increase this counter.  For mesh, it represents the number of messages we have sent to
218          * this remote limiter without having heard from it.  This is reset to 0 when we receive
219          * an update from this peer. */
220         remote->awol += 1;
221         if (remote->awol > MESH_REMOTE_AWOL_THRESHOLD) {
222             remote->reachability = UNREACHABLE;
223         }
224
225 #ifdef ALLOW_PARTITION
226
227         if (do_partition) {
228             printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n",
229                      partition_set, partition_count, partition_set & (1 << partition_count));
230             /* If the partition count bit isn't high in the set, don't actually send anything. */
231             if ((partition_set & (1 << partition_count)) == 0) {
232                 dest.s_addr = ntohl(remote->addr);
233                 strcpy(dest_ip, inet_ntoa(dest));
234
235                 printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip);
236
237                 partition_count += 1;
238                 continue;
239             }
240         }
241
242         partition_count += 1;
243
244 #endif
245
246         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
247         toaddr.sin_port = remote->port;
248         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
249             printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
250             result = errno;
251             printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
252             break;
253         }
254     }
255
256     return result;
257 }
258
259 int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
260     if (msg->type == ACK) {
261         /* If ACK was received then reset the awol count */
262         if (msg->seqno == remote->outgoing.next_seqno - 1) {
263             /* Ack for most recent message.  Clear saved state. */
264             remote->outgoing.first_seqno = remote->outgoing.next_seqno;
265             remote->outgoing.saved_value = 0;
266             remote->outgoing.saved_weight = 0;
267
268             remote->awol = 0;
269         }
270         /* Ignore ack if it isn't for most recent message. */
271     } else if (msg->type == MSG) {
272         if (msg->min_seqno > remote->incoming.seen_seqno) {
273             /* Entirely new information */
274             remote->incoming.seen_seqno = msg->seqno;
275             remote->incoming.saved_value = msg->value;
276             remote->incoming.saved_weight = msg->weight;
277             comm->gossip.value += msg->value;
278             comm->gossip.weight += msg->weight;
279             send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
280             remote->awol = 0;
281         } 
282         else if (msg->seqno > remote->incoming.seen_seqno) {
283             /* Only some of the message is old news. */
284             double diff_value = msg->value - remote->incoming.saved_value;
285             double diff_weight = msg->weight - remote->incoming.saved_weight;
286
287             remote->incoming.seen_seqno = msg->seqno;
288             remote->incoming.saved_value = msg->value;
289             remote->incoming.saved_weight = msg->weight;
290
291             comm->gossip.value += diff_value;
292             comm->gossip.weight += diff_weight;
293             send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
294             remote->awol = 0;
295         } 
296         else {
297             /* The entire message is old news. (Duplicate). */
298             /* Do nothing. */
299         }
300     }
301
302     return 0;
303 }
304
305 int find_gossip_target(comm_t *comm) {
306     int target = NULL_PEER;
307     int k;
308
309     if (comm->shuffle_index < comm->remote_node_count) {
310         target = comm->indices[comm->shuffle_index];
311         printlog(LOG_DEBUG,"GOSSIP: found index %d.\n", target);
312         comm->shuffle_index++;
313     }
314     else {
315         // shuffle the remote_limiters array
316         printlog(LOG_DEBUG, "GOSSIP: shuffling the array.\n");
317         for ( k = 0; k < comm->remote_node_count; k++) {
318             uint32_t l = myrand() % comm->remote_node_count;
319             int t;
320             t = comm->indices[l];
321             comm->indices[l] = comm->indices[k];
322             comm->indices[k] = t;
323         }
324         comm->shuffle_index = 0;
325         target = comm->indices[comm->shuffle_index];
326         printlog(LOG_DEBUG,"GOSSIP: found index after spilling over %d.\n", target);
327         comm->shuffle_index++;
328     }
329     return target;
330 }
331
332 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
333     int i, j;
334     int retry_index = 0;
335     int result = 0;
336     remote_limiter_t *remote;
337     struct sockaddr_in toaddr;
338     double msg_value, msg_weight;
339
340     /* This is the factor for the portion of value/weight to keep locally.
341      * Normally this is 1, meaning that we retain the same amount of value/weight
342      * that was sent to the peers.  In the case of not being able to send to a
343      * peer though, we increment this to reclaim the value/weight locally. */
344     int message_portion = 1;
345
346     memset(&toaddr, 0, sizeof(struct sockaddr_in));
347     toaddr.sin_family = AF_INET;
348
349     msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
350     msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
351
352     for (i = 0; i < comm->gossip.gossip_branch; ++i) {
353         int targetid = NULL_PEER;
354         int rand_count = 0;
355         message_t msg;
356
357         printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
358
359         /* If there are any peers with unacked messages, select them first. */
360         while (retry_index < comm->remote_node_count) {
361             if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
362                 targetid = retry_index;
363                 printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
364             }
365
366             retry_index += 1;
367         }
368
369         while (targetid == NULL_PEER && rand_count < 10) {
370             /* Select a recipient from a randomly-shuffled array. */
371             targetid = find_gossip_target(comm);
372
373             assert(targetid != NULL_PEER);
374
375             /* Don't select an already-used index. */
376             for (j = 0; j < i; ++j) {
377                 if (targetid == comm->selected[j]) {
378                     printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  selected[j=%d] is %d\n", targetid, j, comm->selected[j]); 
379                     targetid = NULL_PEER;
380                     break;
381                 }
382             }
383
384             /* Don't select an unreachable peer or one that is not in our view. */
385             if (targetid != NULL_PEER) {
386                 if (comm->remote_limiters[targetid].reachability != REACHABLE ||
387                         comm->remote_limiters[targetid].view != comm->gossip.view ||
388                         comm->remote_limiters[targetid].view_confidence != IN) {
389                     printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d, reachability is %d, remote view is %d (confidence:%d), my view is %d\n",
390                             targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].view,
391                             comm->remote_limiters[targetid].view_confidence, comm->gossip.view);
392                     targetid = NULL_PEER;
393                 }
394             }
395
396             rand_count++;
397         }
398
399         if (targetid == NULL_PEER) {
400             /* Couldn't find a suitable peer to send to... */
401             message_portion += 1;
402             printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
403             continue;
404         } else {
405             printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
406         }
407
408         remote = &comm->remote_limiters[targetid];
409         comm->selected[i] = targetid;
410
411         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
412         toaddr.sin_port = remote->port;
413
414         memset(&msg, 0, sizeof(message_t));
415         msg.magic = MAGIC_MSG;
416         msg.ident_id = id;
417         msg.value = msg_value + remote->outgoing.saved_value;
418         msg.weight = msg_weight + remote->outgoing.saved_weight;
419         msg.seqno = remote->outgoing.next_seqno;
420         msg.min_seqno = remote->outgoing.first_seqno;
421         msg.type = MSG;
422         msg.view = comm->gossip.view;
423         
424         remote->outgoing.next_seqno++;
425         remote->outgoing.saved_value += msg_value;
426         remote->outgoing.saved_weight += msg_weight;
427         /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
428         remote->awol += 1;
429
430
431 #ifdef ALLOW_PARTITION
432
433         if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
434             printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
435             continue;
436         }
437
438 #endif
439
440         message_to_nbo(&msg);
441
442         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
443             printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
444             result = errno;
445             break;
446         }
447     }
448     comm->gossip.value = msg_value * message_portion;
449     comm->gossip.weight = msg_weight * message_portion;
450
451     return result;
452 }
453
454
455 /* Old TCP code. */
456 #if 0
457 static void hello_to_hbo(hello_t *hello) {
458     hello->magic = ntohl(hello->magic);
459     hello->ident_id = ntohl(hello->ident_id);
460     hello->port = ntohs(hello->port);
461 }
462
463 static void hello_to_nbo(hello_t *hello) {
464     hello->magic = htonl(hello->magic);
465     hello->ident_id = htonl(hello->ident_id);
466     hello->port = htons(hello->port);
467 }
468
469 static int is_connected(remote_limiter_t *remote) {
470     struct sockaddr_in addr;
471     socklen_t addrlen = sizeof(addr);
472
473     if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
474         return 1;
475     else
476         return 0;
477 }
478
479 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
480     int i, targetid, sock;
481     int result = 0;
482     message_t msg;
483
484     memset(&msg, 0, sizeof(message_t));
485     msg.magic = MAGIC_MSG;
486     msg.ident_id = ident->ident_id;
487     msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1);
488     msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1);
489
490     message_to_nbo(&msg);
491
492     for (i = 0; i < ident->gossip.gossip_branch; ++i) {
493         targetid = myrand() % ident->remote_node_count;
494         sock = ident->remote_limiters[targetid].socket;
495
496         result = send(sock, &msg, sizeof(message_t), 0);
497         if (result < 0) {
498             result = errno;
499             FD_CLR(sock, &ident->fds);
500             close(sock);
501             break;
502         }
503
504         assert(result == sizeof(message_t));
505     }
506
507     ident->gossip.value /= (ident->gossip.gossip_branch + 1);
508     ident->gossip.weight /= (ident->gossip.gossip_branch + 1);
509
510     return result;
511 }
512
513 void *limiter_accept_thread(void *limiter) {
514     sigset_t signal_mask;
515
516     sigemptyset(&signal_mask);
517     sigaddset(&signal_mask, SIGHUP);
518     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
519
520     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
521     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
522     while (1) {
523         limiter_accept((comm_limiter_t *) limiter);
524     }
525     pthread_exit(NULL);
526 }
527 void *ident_receive_thread(void *recv_args) {
528     int i, sock, result;
529     struct recv_thread_args *args = (struct recv_thread_args *) recv_args;
530     comm_ident_t *ident = args->ident;
531     pthread_rwlock_t *lock = args->lock;
532     uint16_t port = args->port;
533     struct sockaddr_in addr;
534     socklen_t addrlen = sizeof(addr);
535     hello_t hello;
536
537     free(args);
538
539     while (1) {
540         memset(&hello, 0, sizeof(hello_t));
541
542         /*Try to connect to all remote nodes if they aren't already connected.*/
543         pthread_rwlock_rdlock(lock);
544         pthread_mutex_lock(&ident->lock);
545
546         hello.magic = MAGIC_HELLO;
547         hello.ident_id = ident->ident_id;
548         hello.port = ntohs(port);
549
550         hello_to_nbo(&hello);
551
552         for (i = 0; i < ident->remote_node_count; ++i) {
553             if (is_connected(&ident->remote_limiters[i]))
554                 continue; /* Ignore if it's already connected */
555
556             sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
557             if (sock < 0) {
558                 perror("socket");
559                 continue;
560             }
561
562             assert(sock >= 0);
563
564             memset(&addr, 0, sizeof(struct sockaddr_in));
565             addr.sin_family = AF_INET;
566             addr.sin_port = ident->remote_limiters[i].port;
567             addr.sin_addr.s_addr = ident->remote_limiters[i].addr;
568
569             result = connect(sock, (struct sockaddr *) &addr, addrlen);
570             if (result < 0) {
571                 close(sock);
572                 continue;
573             }
574
575             result = send(sock, &hello, sizeof(hello_t), 0);
576             if (result < 0) {
577                 close(sock);
578                 continue;
579             }
580
581             assert(result == sizeof(hello_t));
582
583             ident->remote_limiters[i].socket = sock;
584             printf("Connected on socket: %d\n", sock);
585             FD_SET(sock, &ident->fds);
586         }
587
588         pthread_rwlock_unlock(lock);
589         pthread_mutex_unlock(&ident->lock);
590
591         ident_receive(ident, lock);
592     }
593     pthread_exit(NULL);
594 }
595
596 static void limiter_accept(comm_limiter_t *limiter) {
597     int sock, result;
598     struct sockaddr_in fromaddr;
599     socklen_t fromlen = sizeof(fromaddr);
600     remote_node_t sender;
601     remote_limiter_t *remote;
602     hello_t hello;
603     comm_ident_t *ident;
604     dent_handle *handle = NULL;
605
606     sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
607
608     assert(sock > 0);
609
610     memset(&hello, 0, sizeof(hello_t));
611     result = recv(sock, &hello, sizeof(hello_t), 0);
612
613     if (result < 0) {
614         close(sock);
615         return; /* Failure - ignore it. */
616     }
617
618     assert(result == sizeof(hello_t));
619
620     hello_to_hbo(&hello);
621
622     assert(hello.magic == MAGIC_HELLO);
623
624     memset(&sender, 0, sizeof(remote_node_t));
625     sender.addr = fromaddr.sin_addr.s_addr;
626     sender.port = ntohs(hello.port);
627
628     pthread_testcancel();
629
630     pthread_rwlock_rdlock(&limiter->rwlock);
631
632     handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
633
634     if (handle == NULL) {
635         printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
636         pthread_rwlock_unlock(&limiter->rwlock);
637         return;
638     }
639
640     ident = limiter->identities[*handle];
641     assert(ident != NULL);
642
643     pthread_mutex_lock(&ident->lock);
644
645     remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
646
647     if (remote == NULL) {
648         printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
649         pthread_mutex_unlock(&ident->lock);
650         pthread_rwlock_unlock(&limiter->rwlock);
651         close(sock);
652         return;
653     }
654
655     if (is_connected(remote)) {
656         /* We are still connected, don't need the new socket. */
657         close(sock);
658         pthread_mutex_unlock(&ident->lock);
659         pthread_rwlock_unlock(&limiter->rwlock);
660         return;
661     }
662
663     /* We weren't connected, but we are now... */
664     remote->socket = sock;
665     printf("Got connection on: %d\n", sock);
666     FD_SET(sock, &ident->fds);
667
668     pthread_mutex_unlock(&ident->lock);
669     pthread_rwlock_unlock(&limiter->rwlock);
670 }
671
672 static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
673     int result;
674     message_t msg;
675
676     memset(&msg, 0, sizeof(message_t));
677
678     result = recv(sock, &msg, sizeof(message_t), 0);
679
680     if (result < 0) {
681         pthread_rwlock_rdlock(limiter_rwlock);
682         pthread_mutex_lock(&ident->lock);
683         FD_CLR(sock, &ident->fds);
684         close(sock);
685         pthread_mutex_unlock(&ident->lock);
686         pthread_rwlock_unlock(limiter_rwlock);
687         return;
688     }
689
690     assert(result == sizeof(message_t));
691
692     message_to_hbo(&msg);
693     assert(msg.magic == MAGIC_MSG);
694
695     pthread_rwlock_rdlock(limiter_rwlock);
696     pthread_mutex_lock(&ident->lock);
697
698     switch (ident->comm_fabric) {
699         case COMM_GOSSIP: {
700             ident->gossip.value += msg.value;
701             ident->gossip.weight += msg.weight;
702         }
703         break;
704
705         default: {
706             assert(1 == 0); /* This case shouldn't happen. Punt for now... */
707         }
708     }
709     pthread_mutex_unlock(&ident->lock);
710     pthread_rwlock_unlock(limiter_rwlock);
711 }
712
713 static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
714     int select_result, i;
715     fd_set fds_copy;
716     struct timeval timeout;
717
718     FD_ZERO(&fds_copy);
719     timeout.tv_sec = 15;
720     timeout.tv_usec = 0;
721
722     pthread_rwlock_rdlock(limiter_rwlock);
723     pthread_mutex_lock(&ident->lock);
724     memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
725     pthread_mutex_unlock(&ident->lock);
726     pthread_rwlock_unlock(limiter_rwlock);
727     
728     /* mask interrupt signals for this thread? */
729
730     select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
731
732     assert(select_result >= 0);
733     
734     if (select_result == 0)
735         return; /* Timed out */
736
737     for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
738         if (FD_ISSET(i, &fds_copy)) {
739             read_tcp_message(ident, limiter_rwlock, i);
740             select_result--;
741         }
742     }
743 }
744 #endif