cb306574680c90557a9789b7c9634887207ee5c0
[distributedratelimiting.git] / drl / peer_comm.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #define _XOPEN_SOURCE 600
4
5 /* Debug output. */
6 #include <stdio.h>
7 #include <stdlib.h>
8
9 /* Socket functions. */
10 #include <sys/types.h>
11 #include <sys/socket.h>
12
13 /* Byte ordering and address structures. */
14 #include <arpa/inet.h>
15
16 /* memset() */
17 #include <string.h>
18
19 /* close() & usleep */
20 #include <unistd.h>
21
22 /* Mutex lock/unlock. */
23 #include <pthread.h>
24
25 /* perror() */
26 #include <errno.h>
27
28 /* select() w/ timeout */
29 #include <sys/select.h>
30 #include <sys/time.h>
31
32 /* assert() */
33 #include <assert.h>
34
35 /* sigaddset(), sigemptyset(), SIGHUP, etc. */
36 #include <signal.h>
37
38 /* DRL data structures. */
39 #include "raterouter.h"
40 #include "ratetypes.h"
41 #include "drl_state.h"
42 #include "peer_comm.h"
43 #include "logging.h"
44
45 extern limiter_t limiter;
46
47 static const uint32_t MAGIC_MSG = 0x123123;
48 static const uint32_t MAGIC_HELLO = 0x456456;
49 static const uint16_t MSG = 1;
50 static const uint16_t ACK = 2;
51
52 static void message_to_hbo(message_t *msg) {
53     msg->magic = ntohl(msg->magic);
54     msg->ident_id = ntohl(msg->ident_id);
55     msg->seqno = ntohl(msg->seqno);
56     msg->min_seqno = ntohl(msg->min_seqno);
57     msg->type = ntohs(msg->type);
58     /* value is a double */
59     /* weight is a double */
60 }
61
62 static void message_to_nbo(message_t *msg) {
63     msg->magic = htonl(msg->magic);
64     msg->ident_id = htonl(msg->ident_id);
65     msg->seqno = htonl(msg->seqno);
66     msg->min_seqno = htonl(msg->min_seqno);
67     msg->type = htons(msg->type);
68     /* value is a double */
69     /* weight is a double */
70 }
71
72 static void hello_to_hbo(hello_t *hello) {
73     hello->magic = ntohl(hello->magic);
74     hello->ident_id = ntohl(hello->ident_id);
75     hello->port = ntohs(hello->port);
76 }
77
78 static void hello_to_nbo(hello_t *hello) {
79     hello->magic = htonl(hello->magic);
80     hello->ident_id = htonl(hello->ident_id);
81     hello->port = htons(hello->port);
82 }
83
84 static int is_connected(remote_limiter_t *remote) {
85     struct sockaddr_in addr;
86     socklen_t addrlen = sizeof(addr);
87
88     if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
89         return 1;
90     else
91         return 0;
92 }
93
94 static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
95     int result = 0;
96     message_t msg;
97     struct sockaddr_in toaddr;
98
99     memset(&toaddr, 0, sizeof(struct sockaddr_in));
100     toaddr.sin_family = AF_INET;
101
102     toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
103     toaddr.sin_port = remote->port;
104
105     memset(&msg, 0, sizeof(msg));
106     msg.magic = MAGIC_MSG;
107     msg.ident_id = ident->id;
108     msg.type = ACK;
109     msg.seqno = seqno;
110
111     message_to_nbo(&msg);
112
113     if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
114         printlog(LOG_WARN, "send_ack: sento failed.\n");
115         result = errno;
116     }
117
118     return result;
119 }
120
121 void limiter_receive() {
122     struct sockaddr_in fromaddr;
123     remote_node_t sender;
124     socklen_t fromlen = sizeof(fromaddr);
125     identity_t *ident = NULL;
126     remote_limiter_t *remote = NULL;
127     message_t msg;
128
129     if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) {
130         /* recv failed.  Log and continue. */
131         printlog(LOG_WARN, "recv failed to read full message.\n");
132         return;
133     }
134     memset(&sender, 0, sizeof(remote_node_t));
135     sender.addr = fromaddr.sin_addr.s_addr;
136     sender.port = fromaddr.sin_port;
137
138     message_to_hbo(&msg);
139
140     assert(msg.magic == MAGIC_MSG);
141
142 #if 0
143     printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n", 
144             msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t));
145 #endif
146     pthread_testcancel();
147
148     pthread_rwlock_rdlock(&limiter.limiter_lock);
149
150     ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id,
151                        sizeof(msg.ident_id));
152
153     if (ident == NULL) {
154         printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n");
155         pthread_rwlock_unlock(&limiter.limiter_lock);
156         return;
157     }
158
159     pthread_mutex_lock(&ident->comm.lock);
160
161     remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t));
162
163     if (remote == NULL) {
164         printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n");
165         pthread_mutex_unlock(&ident->comm.lock);
166         pthread_rwlock_unlock(&limiter.limiter_lock);
167         return;
168     }
169
170     switch (ident->comm.comm_fabric) {
171         case COMM_MESH: {
172             /* Use the message's value to be our new GRDrate/FPSweight for the
173              * message's sender. */
174             remote->rate = msg.value;
175
176             /* Reset the AWOL counter to zero since we received an update. */
177             remote->awol = 0;
178         }
179         break;
180
181         case COMM_GOSSIP: {
182             if (msg.type == ACK) {
183                 if (msg.seqno == remote->outgoing.next_seqno - 1) {
184                     int i;
185
186                     /* Ack for most recent message.  Clear saved state. */
187                     remote->outgoing.first_seqno = remote->outgoing.next_seqno;
188                     remote->outgoing.saved_value = 0;
189                     remote->outgoing.saved_weight = 0;
190
191                     for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
192                         if (ident->comm.retrys[i] >= 0 &&
193                             remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
194                                 //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]);
195                                 ident->comm.retrys[i] = -2;
196                         }
197                     }
198                 }
199                 /* Ignore ack if it isn't for most recent message. */
200             } else {
201                 if (msg.min_seqno > remote->incoming.seen_seqno) {
202                     /* Entirely new information */
203                     remote->incoming.seen_seqno = msg.seqno;
204                     remote->incoming.saved_value = msg.value;
205                     remote->incoming.saved_weight = msg.weight;
206                     ident->comm.gossip.value += msg.value;
207                     ident->comm.gossip.weight += msg.weight;
208                     send_ack(ident, remote, msg.seqno);
209                 } else if (msg.seqno > remote->incoming.seen_seqno) {
210                     /* Only some of the message is old news. */
211                     double diff_value = msg.value - remote->incoming.saved_value;
212                     double diff_weight = msg.weight - remote->incoming.saved_weight;
213
214                     remote->incoming.seen_seqno = msg.seqno;
215                     remote->incoming.saved_value = msg.value;
216                     remote->incoming.saved_weight = msg.weight;
217
218                     ident->comm.gossip.value += diff_value;
219                     ident->comm.gossip.weight += diff_weight;
220                     send_ack(ident, remote, msg.seqno);
221                 } else {
222                     /* The entire message is old news. (Duplicate). */
223                     /* Do nothing. */
224                 }
225             }
226         }
227         break;
228
229         default: {
230             printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
231         }
232     }
233
234     pthread_mutex_unlock(&ident->comm.lock);
235     pthread_rwlock_unlock(&limiter.limiter_lock);
236 }
237
238 #if 0
239 static void limiter_accept(comm_limiter_t *limiter) {
240     int sock, result;
241     struct sockaddr_in fromaddr;
242     socklen_t fromlen = sizeof(fromaddr);
243     remote_node_t sender;
244     remote_limiter_t *remote;
245     hello_t hello;
246     comm_ident_t *ident;
247     ident_handle *handle = NULL;
248
249     sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
250
251     assert(sock > 0);
252
253     memset(&hello, 0, sizeof(hello_t));
254     result = recv(sock, &hello, sizeof(hello_t), 0);
255
256     if (result < 0) {
257         close(sock);
258         return; /* Failure - ignore it. */
259     }
260
261     assert(result == sizeof(hello_t));
262
263     hello_to_hbo(&hello);
264
265     assert(hello.magic == MAGIC_HELLO);
266
267     memset(&sender, 0, sizeof(remote_node_t));
268     sender.addr = fromaddr.sin_addr.s_addr;
269     sender.port = ntohs(hello.port);
270
271     pthread_testcancel();
272
273     pthread_rwlock_rdlock(&limiter->rwlock);
274
275     handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
276
277     if (handle == NULL) {
278         printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
279         pthread_rwlock_unlock(&limiter->rwlock);
280         return;
281     }
282
283     ident = limiter->identities[*handle];
284     assert(ident != NULL);
285
286     pthread_mutex_lock(&ident->lock);
287
288     remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
289
290     if (remote == NULL) {
291         printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
292         pthread_mutex_unlock(&ident->lock);
293         pthread_rwlock_unlock(&limiter->rwlock);
294         close(sock);
295         return;
296     }
297
298     if (is_connected(remote)) {
299         /* We are still connected, don't need the new socket. */
300         close(sock);
301         pthread_mutex_unlock(&ident->lock);
302         pthread_rwlock_unlock(&limiter->rwlock);
303         return;
304     }
305
306     /* We weren't connected, but we are now... */
307     remote->socket = sock;
308     printf("Got connection on: %d\n", sock);
309     FD_SET(sock, &ident->fds);
310
311     pthread_mutex_unlock(&ident->lock);
312     pthread_rwlock_unlock(&limiter->rwlock);
313 }
314
315 static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
316     int result;
317     message_t msg;
318
319     memset(&msg, 0, sizeof(message_t));
320
321     result = recv(sock, &msg, sizeof(message_t), 0);
322
323     if (result < 0) {
324         pthread_rwlock_rdlock(limiter_rwlock);
325         pthread_mutex_lock(&ident->lock);
326         FD_CLR(sock, &ident->fds);
327         close(sock);
328         pthread_mutex_unlock(&ident->lock);
329         pthread_rwlock_unlock(limiter_rwlock);
330         return;
331     }
332
333     assert(result == sizeof(message_t));
334
335     message_to_hbo(&msg);
336     assert(msg.magic == MAGIC_MSG);
337
338     pthread_rwlock_rdlock(limiter_rwlock);
339     pthread_mutex_lock(&ident->lock);
340
341     switch (ident->comm_fabric) {
342         case COMM_GOSSIP: {
343             ident->gossip.value += msg.value;
344             ident->gossip.weight += msg.weight;
345         }
346         break;
347
348         default: {
349             assert(1 == 0); /* This case shouldn't happen. Punt for now... */
350         }
351     }
352     pthread_mutex_unlock(&ident->lock);
353     pthread_rwlock_unlock(limiter_rwlock);
354 }
355
356 static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
357     int select_result, i;
358     fd_set fds_copy;
359     struct timeval timeout;
360
361     FD_ZERO(&fds_copy);
362     timeout.tv_sec = 15;
363     timeout.tv_usec = 0;
364
365     pthread_rwlock_rdlock(limiter_rwlock);
366     pthread_mutex_lock(&ident->lock);
367     memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
368     pthread_mutex_unlock(&ident->lock);
369     pthread_rwlock_unlock(limiter_rwlock);
370     
371     /* mask interrupt signals for this thread? */
372
373     select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
374
375     assert(select_result >= 0);
376     
377     if (select_result == 0)
378         return; /* Timed out */
379
380     for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
381         if (FD_ISSET(i, &fds_copy)) {
382             read_tcp_message(ident, limiter_rwlock, i);
383             select_result--;
384         }
385     }
386 }
387 #endif
388
389 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
390     int result = 0;
391     remote_limiter_t *remote;
392     message_t msg;
393     struct sockaddr_in toaddr;
394
395     memset(&toaddr, 0, sizeof(struct sockaddr_in));
396     toaddr.sin_family = AF_INET;
397
398     memset(&msg, 0, sizeof(message_t));
399     msg.magic = MAGIC_MSG;
400     msg.ident_id = id;
401     msg.value = comm->local_rate;
402     /* Do we want seqnos for mesh?  We can get by without them. */
403
404     message_to_nbo(&msg);
405
406     /* Iterate though and send update to all remote limiters in our identity. */
407     map_reset_iterate(comm->remote_node_map);
408     while ((remote = map_next(comm->remote_node_map))) {
409         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
410         toaddr.sin_port = remote->port;
411         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
412             printlog(LOG_CRITICAL, "ERR: limiter_send_mesh: sento failed.\n");
413             result = errno;
414             printlog(LOG_CRITICAL, "  - The error was |%d|\n", strerror(result));
415             break;
416         }
417     }
418
419     return result;
420 }
421
422 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
423     int i, j, targetid;
424     int result = 0;
425     remote_limiter_t *remote;
426     struct sockaddr_in toaddr;
427     double msg_value, msg_weight;
428
429     memset(&toaddr, 0, sizeof(struct sockaddr_in));
430     toaddr.sin_family = AF_INET;
431
432     msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
433     msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
434
435     for (i = 0; i < comm->gossip.gossip_branch; ++i) {
436         message_t msg;
437
438         if (comm->retrys[i] >= 0) {
439             remote = &comm->remote_limiters[comm->retrys[i]];
440             targetid = comm->retrys[i];
441             //printf("%d:d:%d, ", i, comm->retrys[i]);
442         } else {
443             targetid = -2;
444
445             while (targetid == -2) {
446                 targetid = myrand() % comm->remote_node_count;
447
448                 for (j = 0; j < comm->gossip.gossip_branch; ++j) {
449                     if (targetid == comm->retrys[j]) {
450                         targetid = -2;
451                         break;
452                     }
453                 }
454             }
455
456             remote = &comm->remote_limiters[targetid];
457             //printf("%d:r:%d, ", i, targetid);
458         }
459
460         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
461         toaddr.sin_port = remote->port;
462
463         memset(&msg, 0, sizeof(message_t));
464         msg.magic = MAGIC_MSG;
465         msg.ident_id = id;
466         msg.value = msg_value + remote->outgoing.saved_value;
467         msg.weight = msg_weight + remote->outgoing.saved_weight;
468         msg.seqno = remote->outgoing.next_seqno;
469         msg.min_seqno = remote->outgoing.first_seqno;
470         msg.type = MSG;
471
472         remote->outgoing.next_seqno++;
473         remote->outgoing.saved_value += msg_value;
474         remote->outgoing.saved_weight += msg_weight;
475
476         message_to_nbo(&msg);
477
478         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
479             printlog(LOG_CRITICAL, "ERR: limiter_send_gossip: sento failed.\n");
480             result = errno;
481             break;
482         }
483
484         comm->retrys[i] = targetid;
485     }
486     //printf("\n");
487
488     comm->gossip.value = msg_value;
489     comm->gossip.weight = msg_weight;
490
491     return result;
492 }
493
494 #if 0
495 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
496     int i, targetid, sock;
497     int result = 0;
498     message_t msg;
499
500     memset(&msg, 0, sizeof(message_t));
501     msg.magic = MAGIC_MSG;
502     msg.ident_id = ident->ident_id;
503     msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1);
504     msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1);
505
506     message_to_nbo(&msg);
507
508     for (i = 0; i < ident->gossip.gossip_branch; ++i) {
509         targetid = myrand() % ident->remote_node_count;
510         sock = ident->remote_limiters[targetid].socket;
511
512         result = send(sock, &msg, sizeof(message_t), 0);
513         if (result < 0) {
514             result = errno;
515             FD_CLR(sock, &ident->fds);
516             close(sock);
517             break;
518         }
519
520         assert(result == sizeof(message_t));
521     }
522
523     ident->gossip.value /= (ident->gossip.gossip_branch + 1);
524     ident->gossip.weight /= (ident->gossip.gossip_branch + 1);
525
526     return result;
527 }
528 #endif
529
530 #if 0
531 void *limiter_accept_thread(void *limiter) {
532     sigset_t signal_mask;
533
534     sigemptyset(&signal_mask);
535     sigaddset(&signal_mask, SIGHUP);
536     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
537
538     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
539     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
540     while (1) {
541         limiter_accept((comm_limiter_t *) limiter);
542     }
543     pthread_exit(NULL);
544 }
545 void *ident_receive_thread(void *recv_args) {
546     int i, sock, result;
547     struct recv_thread_args *args = (struct recv_thread_args *) recv_args;
548     comm_ident_t *ident = args->ident;
549     pthread_rwlock_t *lock = args->lock;
550     uint16_t port = args->port;
551     struct sockaddr_in addr;
552     socklen_t addrlen = sizeof(addr);
553     hello_t hello;
554
555     free(args);
556
557     while (1) {
558         memset(&hello, 0, sizeof(hello_t));
559
560         /*Try to connect to all remote nodes if they aren't already connected.*/
561         pthread_rwlock_rdlock(lock);
562         pthread_mutex_lock(&ident->lock);
563
564         hello.magic = MAGIC_HELLO;
565         hello.ident_id = ident->ident_id;
566         hello.port = ntohs(port);
567
568         hello_to_nbo(&hello);
569
570         for (i = 0; i < ident->remote_node_count; ++i) {
571             if (is_connected(&ident->remote_limiters[i]))
572                 continue; /* Ignore if it's already connected */
573
574             sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
575             if (sock < 0) {
576                 perror("socket");
577                 continue;
578             }
579
580             assert(sock >= 0);
581
582             memset(&addr, 0, sizeof(struct sockaddr_in));
583             addr.sin_family = AF_INET;
584             addr.sin_port = ident->remote_limiters[i].port;
585             addr.sin_addr.s_addr = ident->remote_limiters[i].addr;
586
587             result = connect(sock, (struct sockaddr *) &addr, addrlen);
588             if (result < 0) {
589                 close(sock);
590                 continue;
591             }
592
593             result = send(sock, &hello, sizeof(hello_t), 0);
594             if (result < 0) {
595                 close(sock);
596                 continue;
597             }
598
599             assert(result == sizeof(hello_t));
600
601             ident->remote_limiters[i].socket = sock;
602             printf("Connected on socket: %d\n", sock);
603             FD_SET(sock, &ident->fds);
604         }
605
606         pthread_rwlock_unlock(lock);
607         pthread_mutex_unlock(&ident->lock);
608
609         ident_receive(ident, lock);
610     }
611     pthread_exit(NULL);
612 }
613 #endif