eba9637fb13e81d98d10991513483832e1d3ba30
[distributedratelimiting.git] / drl / peer_comm.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #define _XOPEN_SOURCE 600
4
5 /* Debug output. */
6 #include <stdio.h>
7 #include <stdlib.h>
8
9 /* Socket functions. */
10 #include <sys/types.h>
11 #include <sys/socket.h>
12
13 /* Byte ordering and address structures. */
14 #include <arpa/inet.h>
15
16 /* memset() */
17 #include <string.h>
18
19 /* close() & usleep */
20 #include <unistd.h>
21
22 /* Mutex lock/unlock. */
23 #include <pthread.h>
24
25 /* perror() */
26 #include <errno.h>
27
28 /* select() w/ timeout */
29 #include <sys/select.h>
30 #include <sys/time.h>
31
32 /* assert() */
33 #include <assert.h>
34
35 /* sigaddset(), sigemptyset(), SIGHUP, etc. */
36 #include <signal.h>
37
38 /* DRL data structures. */
39 #include "raterouter.h"
40 #include "ratetypes.h"
41 #include "drl_state.h"
42 #include "peer_comm.h"
43 #include "logging.h"
44
45 /* Artifically makes a network partition. */
46 int do_partition = 0;
47 int partition_set = 0xfffffff;
48
49 extern limiter_t limiter;
50
51 static const uint32_t MAGIC_MSG = 0x123123;
52 static const uint32_t MAGIC_HELLO = 0x456456;
53 static const uint16_t MSG = 1;
54 static const uint16_t ACK = 2;
55
56 static void message_to_hbo(message_t *msg) {
57     msg->magic = ntohl(msg->magic);
58     msg->ident_id = ntohl(msg->ident_id);
59     msg->seqno = ntohl(msg->seqno);
60     msg->min_seqno = ntohl(msg->min_seqno);
61     msg->type = ntohs(msg->type);
62     /* value is a double */
63     /* weight is a double */
64 }
65
66 static void message_to_nbo(message_t *msg) {
67     msg->magic = htonl(msg->magic);
68     msg->ident_id = htonl(msg->ident_id);
69     msg->seqno = htonl(msg->seqno);
70     msg->min_seqno = htonl(msg->min_seqno);
71     msg->type = htons(msg->type);
72     /* value is a double */
73     /* weight is a double */
74 }
75
76 static void hello_to_hbo(hello_t *hello) {
77     hello->magic = ntohl(hello->magic);
78     hello->ident_id = ntohl(hello->ident_id);
79     hello->port = ntohs(hello->port);
80 }
81
82 static void hello_to_nbo(hello_t *hello) {
83     hello->magic = htonl(hello->magic);
84     hello->ident_id = htonl(hello->ident_id);
85     hello->port = htons(hello->port);
86 }
87
88 static int is_connected(remote_limiter_t *remote) {
89     struct sockaddr_in addr;
90     socklen_t addrlen = sizeof(addr);
91
92     if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
93         return 1;
94     else
95         return 0;
96 }
97
98 static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
99     int result = 0;
100     message_t msg;
101     struct sockaddr_in toaddr;
102
103     memset(&toaddr, 0, sizeof(struct sockaddr_in));
104     toaddr.sin_family = AF_INET;
105
106     toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
107     toaddr.sin_port = remote->port;
108
109     memset(&msg, 0, sizeof(msg));
110     msg.magic = MAGIC_MSG;
111     msg.ident_id = ident->id;
112     msg.type = ACK;
113     msg.seqno = seqno;
114
115     message_to_nbo(&msg);
116
117     if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
118         printlog(LOG_WARN, "send_ack: sento failed.\n");
119         result = errno;
120     }
121
122     return result;
123 }
124
125 void limiter_receive() {
126     struct sockaddr_in fromaddr;
127     remote_node_t sender;
128     socklen_t fromlen = sizeof(fromaddr);
129     identity_t *ident = NULL;
130     remote_limiter_t *remote = NULL;
131     message_t msg;
132
133     if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) {
134         /* recv failed.  Log and continue. */
135         printlog(LOG_WARN, "recv failed to read full message.\n");
136         return;
137     }
138     memset(&sender, 0, sizeof(remote_node_t));
139     sender.addr = fromaddr.sin_addr.s_addr;
140     sender.port = fromaddr.sin_port;
141
142     message_to_hbo(&msg);
143
144     assert(msg.magic == MAGIC_MSG);
145
146 #if 0
147     printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n", 
148             msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t));
149 #endif
150     pthread_testcancel();
151
152     pthread_rwlock_rdlock(&limiter.limiter_lock);
153
154     ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id,
155                        sizeof(msg.ident_id));
156
157     if (ident == NULL) {
158         printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n");
159         pthread_rwlock_unlock(&limiter.limiter_lock);
160         return;
161     }
162
163     pthread_mutex_lock(&ident->comm.lock);
164
165     remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t));
166
167     if (remote == NULL) {
168         printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n");
169         pthread_mutex_unlock(&ident->comm.lock);
170         pthread_rwlock_unlock(&limiter.limiter_lock);
171         return;
172     }
173
174     switch (ident->comm.comm_fabric) {
175         case COMM_MESH: {
176             /* Use the message's value to be our new GRDrate/FPSweight for the
177              * message's sender. */
178             remote->rate = msg.value;
179
180             /* Reset the AWOL counter to zero since we received an update. */
181             remote->awol = 0;
182         }
183         break;
184
185         case COMM_GOSSIP: {
186             if (msg.type == ACK) {
187                 if (msg.seqno == remote->outgoing.next_seqno - 1) {
188                     int i;
189
190                     /* Ack for most recent message.  Clear saved state. */
191                     remote->outgoing.first_seqno = remote->outgoing.next_seqno;
192                     remote->outgoing.saved_value = 0;
193                     remote->outgoing.saved_weight = 0;
194
195                     for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
196                         if (ident->comm.retrys[i] >= 0 &&
197                             remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
198                                 ident->comm.retrys[i] = -2;
199                         }
200                     }
201                 }
202                 /* Ignore ack if it isn't for most recent message. */
203             } else {
204                 if (msg.min_seqno > remote->incoming.seen_seqno) {
205                     /* Entirely new information */
206                     remote->incoming.seen_seqno = msg.seqno;
207                     remote->incoming.saved_value = msg.value;
208                     remote->incoming.saved_weight = msg.weight;
209                     ident->comm.gossip.value += msg.value;
210                     ident->comm.gossip.weight += msg.weight;
211                     send_ack(ident, remote, msg.seqno);
212                     remote->awol = 0;
213                 } else if (msg.seqno > remote->incoming.seen_seqno) {
214                     /* Only some of the message is old news. */
215                     double diff_value = msg.value - remote->incoming.saved_value;
216                     double diff_weight = msg.weight - remote->incoming.saved_weight;
217
218                     remote->incoming.seen_seqno = msg.seqno;
219                     remote->incoming.saved_value = msg.value;
220                     remote->incoming.saved_weight = msg.weight;
221
222                     ident->comm.gossip.value += diff_value;
223                     ident->comm.gossip.weight += diff_weight;
224                     send_ack(ident, remote, msg.seqno);
225                     remote->awol = 0;
226                 } else {
227                     /* The entire message is old news. (Duplicate). */
228                     /* Do nothing. */
229                 }
230             }
231         }
232         break;
233
234         default: {
235             printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
236         }
237     }
238
239     pthread_mutex_unlock(&ident->comm.lock);
240     pthread_rwlock_unlock(&limiter.limiter_lock);
241 }
242
243 #if 0
244 static void limiter_accept(comm_limiter_t *limiter) {
245     int sock, result;
246     struct sockaddr_in fromaddr;
247     socklen_t fromlen = sizeof(fromaddr);
248     remote_node_t sender;
249     remote_limiter_t *remote;
250     hello_t hello;
251     comm_ident_t *ident;
252     ident_handle *handle = NULL;
253
254     sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
255
256     assert(sock > 0);
257
258     memset(&hello, 0, sizeof(hello_t));
259     result = recv(sock, &hello, sizeof(hello_t), 0);
260
261     if (result < 0) {
262         close(sock);
263         return; /* Failure - ignore it. */
264     }
265
266     assert(result == sizeof(hello_t));
267
268     hello_to_hbo(&hello);
269
270     assert(hello.magic == MAGIC_HELLO);
271
272     memset(&sender, 0, sizeof(remote_node_t));
273     sender.addr = fromaddr.sin_addr.s_addr;
274     sender.port = ntohs(hello.port);
275
276     pthread_testcancel();
277
278     pthread_rwlock_rdlock(&limiter->rwlock);
279
280     handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
281
282     if (handle == NULL) {
283         printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
284         pthread_rwlock_unlock(&limiter->rwlock);
285         return;
286     }
287
288     ident = limiter->identities[*handle];
289     assert(ident != NULL);
290
291     pthread_mutex_lock(&ident->lock);
292
293     remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
294
295     if (remote == NULL) {
296         printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
297         pthread_mutex_unlock(&ident->lock);
298         pthread_rwlock_unlock(&limiter->rwlock);
299         close(sock);
300         return;
301     }
302
303     if (is_connected(remote)) {
304         /* We are still connected, don't need the new socket. */
305         close(sock);
306         pthread_mutex_unlock(&ident->lock);
307         pthread_rwlock_unlock(&limiter->rwlock);
308         return;
309     }
310
311     /* We weren't connected, but we are now... */
312     remote->socket = sock;
313     printf("Got connection on: %d\n", sock);
314     FD_SET(sock, &ident->fds);
315
316     pthread_mutex_unlock(&ident->lock);
317     pthread_rwlock_unlock(&limiter->rwlock);
318 }
319
320 static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
321     int result;
322     message_t msg;
323
324     memset(&msg, 0, sizeof(message_t));
325
326     result = recv(sock, &msg, sizeof(message_t), 0);
327
328     if (result < 0) {
329         pthread_rwlock_rdlock(limiter_rwlock);
330         pthread_mutex_lock(&ident->lock);
331         FD_CLR(sock, &ident->fds);
332         close(sock);
333         pthread_mutex_unlock(&ident->lock);
334         pthread_rwlock_unlock(limiter_rwlock);
335         return;
336     }
337
338     assert(result == sizeof(message_t));
339
340     message_to_hbo(&msg);
341     assert(msg.magic == MAGIC_MSG);
342
343     pthread_rwlock_rdlock(limiter_rwlock);
344     pthread_mutex_lock(&ident->lock);
345
346     switch (ident->comm_fabric) {
347         case COMM_GOSSIP: {
348             ident->gossip.value += msg.value;
349             ident->gossip.weight += msg.weight;
350         }
351         break;
352
353         default: {
354             assert(1 == 0); /* This case shouldn't happen. Punt for now... */
355         }
356     }
357     pthread_mutex_unlock(&ident->lock);
358     pthread_rwlock_unlock(limiter_rwlock);
359 }
360
361 static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
362     int select_result, i;
363     fd_set fds_copy;
364     struct timeval timeout;
365
366     FD_ZERO(&fds_copy);
367     timeout.tv_sec = 15;
368     timeout.tv_usec = 0;
369
370     pthread_rwlock_rdlock(limiter_rwlock);
371     pthread_mutex_lock(&ident->lock);
372     memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
373     pthread_mutex_unlock(&ident->lock);
374     pthread_rwlock_unlock(limiter_rwlock);
375     
376     /* mask interrupt signals for this thread? */
377
378     select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
379
380     assert(select_result >= 0);
381     
382     if (select_result == 0)
383         return; /* Timed out */
384
385     for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
386         if (FD_ISSET(i, &fds_copy)) {
387             read_tcp_message(ident, limiter_rwlock, i);
388             select_result--;
389         }
390     }
391 }
392 #endif
393
394 /* Turn this on to simulate network partitions.
395  * Turn off for production settings. */
396 //#define ALLOW_PARTITION
397
398 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
399     int result = 0;
400     remote_limiter_t *remote;
401     message_t msg;
402     struct sockaddr_in toaddr;
403     int i;
404
405 #ifdef ALLOW_PARTITION
406
407     int partition_count = 0;
408     struct in_addr dest;
409     char dest_ip[22];
410
411 #endif
412
413     memset(&toaddr, 0, sizeof(struct sockaddr_in));
414     toaddr.sin_family = AF_INET;
415
416     memset(&msg, 0, sizeof(message_t));
417     msg.magic = MAGIC_MSG;
418     msg.ident_id = id;
419     msg.value = comm->local_rate;
420     /* Do we want seqnos for mesh?  We can get by without them. */
421
422     message_to_nbo(&msg);
423
424     /* Iterate though and send update to all remote limiters in our identity. */
425     for (i = 0; i < comm->remote_node_count; ++i) {
426         remote = &comm->remote_limiters[i];
427
428 #ifdef ALLOW_PARTITION
429
430         if (do_partition) {
431             printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n",
432                      partition_set, partition_count, partition_set & (1 << partition_count));
433             /* If the partition count bit isn't high in the set, don't actually send anything. */
434             if ((partition_set & (1 << partition_count)) == 0) {
435                 dest.s_addr = ntohl(remote->addr);
436                 strcpy(dest_ip, inet_ntoa(dest));
437
438                 printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip);
439
440                 partition_count += 1;
441                 continue;
442             }
443         }
444
445         partition_count += 1;
446
447 #endif
448
449         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
450         toaddr.sin_port = remote->port;
451         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
452             printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
453             result = errno;
454             printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
455             break;
456         }
457     }
458
459     return result;
460 }
461
462 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
463     int i, j, targetid;
464     int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
465     int rand_count; //HACK...
466     int result = 0;
467     remote_limiter_t *remote;
468     struct sockaddr_in toaddr;
469     double msg_value, msg_weight;
470
471     /* This is the factor for the portion of value/weight to keep locally.
472      * Normally this is 1, meaning that we retain the same amount of value/weight
473      * that was sent to the peers.  In the case of not being able to send to a
474      * peer though, we increment this to reclaim the value/weight locally. */
475     int message_portion = 1;
476     
477     memset(&toaddr, 0, sizeof(struct sockaddr_in));
478     toaddr.sin_family = AF_INET;
479
480     msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
481     msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
482
483     for (i = 0; i < comm->gossip.gossip_branch; ++i) {
484         message_t msg;
485
486         printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
487
488         if (comm->retrys[i] >= 0) {
489             remote = &comm->remote_limiters[comm->retrys[i]];
490             targetid = comm->retrys[i];
491
492             if (remote->awol > awol_threshold) {
493                 message_portion += 1;
494                 printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]);
495                 comm->retrys[i] = -1;
496                 continue;
497             }
498         } else {
499             targetid = -2;
500             rand_count = 0;
501
502             while (targetid == -2 && rand_count < 50) {
503                 targetid = myrand() % comm->remote_node_count;
504                 rand_count += 1;
505
506                 /* Don't select an already-used index. */
507                 for (j = 0; j < comm->gossip.gossip_branch; ++j) {
508                     if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) {
509                         printlog(LOG_DEBUG, "Gossip: disqualified targetid %d.  retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol);
510                         targetid = -2;
511                         break;
512                     }
513                 }
514             }
515
516             if (targetid < 0) {
517                 /* Couldn't find a suitable peer to send to... */
518                 message_portion += 1;
519                 printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n");
520                 continue;
521             } else {
522                 printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid);
523             }
524
525             remote = &comm->remote_limiters[targetid];
526         }
527         
528         comm->retrys[i] = targetid;
529
530         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
531         toaddr.sin_port = remote->port;
532
533         memset(&msg, 0, sizeof(message_t));
534         msg.magic = MAGIC_MSG;
535         msg.ident_id = id;
536         msg.value = msg_value + remote->outgoing.saved_value;
537         msg.weight = msg_weight + remote->outgoing.saved_weight;
538         msg.seqno = remote->outgoing.next_seqno;
539         msg.min_seqno = remote->outgoing.first_seqno;
540         msg.type = MSG;
541
542         remote->outgoing.next_seqno++;
543         remote->outgoing.saved_value += msg_value;
544         remote->outgoing.saved_weight += msg_weight;
545
546 #ifdef ALLOW_PARTITION
547
548         if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
549             printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid);
550             continue;
551         }
552
553 #endif
554
555         message_to_nbo(&msg);
556
557         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
558             printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
559             result = errno;
560             break;
561         }
562     }
563
564     comm->gossip.value = msg_value * message_portion;
565     comm->gossip.weight = msg_weight * message_portion;
566
567     return result;
568 }
569
570 #if 0
571 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
572     int i, targetid, sock;
573     int result = 0;
574     message_t msg;
575
576     memset(&msg, 0, sizeof(message_t));
577     msg.magic = MAGIC_MSG;
578     msg.ident_id = ident->ident_id;
579     msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1);
580     msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1);
581
582     message_to_nbo(&msg);
583
584     for (i = 0; i < ident->gossip.gossip_branch; ++i) {
585         targetid = myrand() % ident->remote_node_count;
586         sock = ident->remote_limiters[targetid].socket;
587
588         result = send(sock, &msg, sizeof(message_t), 0);
589         if (result < 0) {
590             result = errno;
591             FD_CLR(sock, &ident->fds);
592             close(sock);
593             break;
594         }
595
596         assert(result == sizeof(message_t));
597     }
598
599     ident->gossip.value /= (ident->gossip.gossip_branch + 1);
600     ident->gossip.weight /= (ident->gossip.gossip_branch + 1);
601
602     return result;
603 }
604 #endif
605
606 #if 0
607 void *limiter_accept_thread(void *limiter) {
608     sigset_t signal_mask;
609
610     sigemptyset(&signal_mask);
611     sigaddset(&signal_mask, SIGHUP);
612     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
613
614     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
615     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
616     while (1) {
617         limiter_accept((comm_limiter_t *) limiter);
618     }
619     pthread_exit(NULL);
620 }
621 void *ident_receive_thread(void *recv_args) {
622     int i, sock, result;
623     struct recv_thread_args *args = (struct recv_thread_args *) recv_args;
624     comm_ident_t *ident = args->ident;
625     pthread_rwlock_t *lock = args->lock;
626     uint16_t port = args->port;
627     struct sockaddr_in addr;
628     socklen_t addrlen = sizeof(addr);
629     hello_t hello;
630
631     free(args);
632
633     while (1) {
634         memset(&hello, 0, sizeof(hello_t));
635
636         /*Try to connect to all remote nodes if they aren't already connected.*/
637         pthread_rwlock_rdlock(lock);
638         pthread_mutex_lock(&ident->lock);
639
640         hello.magic = MAGIC_HELLO;
641         hello.ident_id = ident->ident_id;
642         hello.port = ntohs(port);
643
644         hello_to_nbo(&hello);
645
646         for (i = 0; i < ident->remote_node_count; ++i) {
647             if (is_connected(&ident->remote_limiters[i]))
648                 continue; /* Ignore if it's already connected */
649
650             sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
651             if (sock < 0) {
652                 perror("socket");
653                 continue;
654             }
655
656             assert(sock >= 0);
657
658             memset(&addr, 0, sizeof(struct sockaddr_in));
659             addr.sin_family = AF_INET;
660             addr.sin_port = ident->remote_limiters[i].port;
661             addr.sin_addr.s_addr = ident->remote_limiters[i].addr;
662
663             result = connect(sock, (struct sockaddr *) &addr, addrlen);
664             if (result < 0) {
665                 close(sock);
666                 continue;
667             }
668
669             result = send(sock, &hello, sizeof(hello_t), 0);
670             if (result < 0) {
671                 close(sock);
672                 continue;
673             }
674
675             assert(result == sizeof(hello_t));
676
677             ident->remote_limiters[i].socket = sock;
678             printf("Connected on socket: %d\n", sock);
679             FD_SET(sock, &ident->fds);
680         }
681
682         pthread_rwlock_unlock(lock);
683         pthread_mutex_unlock(&ident->lock);
684
685         ident_receive(ident, lock);
686     }
687     pthread_exit(NULL);
688 }
689 #endif