1 /* See the DRL-LICENSE file for this file's software license. */
3 #define _XOPEN_SOURCE 600
9 /* Socket functions. */
10 #include <sys/types.h>
11 #include <sys/socket.h>
13 /* Byte ordering and address structures. */
14 #include <arpa/inet.h>
19 /* close() & usleep */
22 /* Mutex lock/unlock. */
28 /* select() w/ timeout */
29 #include <sys/select.h>
35 /* sigaddset(), sigemptyset(), SIGHUP, etc. */
38 /* DRL data structures. */
39 #include "raterouter.h"
40 #include "ratetypes.h"
41 #include "drl_state.h"
42 #include "peer_comm.h"
45 extern limiter_t limiter;
47 static const uint32_t MAGIC_MSG = 0x123123;
48 static const uint32_t MAGIC_HELLO = 0x456456;
49 static const uint16_t MSG = 1;
50 static const uint16_t ACK = 2;
52 static void message_to_hbo(message_t *msg) {
53 msg->magic = ntohl(msg->magic);
54 msg->ident_id = ntohl(msg->ident_id);
55 msg->seqno = ntohl(msg->seqno);
56 msg->min_seqno = ntohl(msg->min_seqno);
57 msg->type = ntohs(msg->type);
58 /* value is a double */
59 /* weight is a double */
62 static void message_to_nbo(message_t *msg) {
63 msg->magic = htonl(msg->magic);
64 msg->ident_id = htonl(msg->ident_id);
65 msg->seqno = htonl(msg->seqno);
66 msg->min_seqno = htonl(msg->min_seqno);
67 msg->type = htons(msg->type);
68 /* value is a double */
69 /* weight is a double */
72 static void hello_to_hbo(hello_t *hello) {
73 hello->magic = ntohl(hello->magic);
74 hello->ident_id = ntohl(hello->ident_id);
75 hello->port = ntohs(hello->port);
78 static void hello_to_nbo(hello_t *hello) {
79 hello->magic = htonl(hello->magic);
80 hello->ident_id = htonl(hello->ident_id);
81 hello->port = htons(hello->port);
84 static int is_connected(remote_limiter_t *remote) {
85 struct sockaddr_in addr;
86 socklen_t addrlen = sizeof(addr);
88 if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
94 static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
97 struct sockaddr_in toaddr;
99 memset(&toaddr, 0, sizeof(struct sockaddr_in));
100 toaddr.sin_family = AF_INET;
102 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
103 toaddr.sin_port = remote->port;
105 memset(&msg, 0, sizeof(msg));
106 msg.magic = MAGIC_MSG;
107 msg.ident_id = ident->id;
111 message_to_nbo(&msg);
113 if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
114 printlog(LOG_WARN, "send_ack: sento failed.\n");
121 void limiter_receive() {
122 struct sockaddr_in fromaddr;
123 remote_node_t sender;
124 socklen_t fromlen = sizeof(fromaddr);
125 identity_t *ident = NULL;
126 remote_limiter_t *remote = NULL;
129 if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) {
130 /* recv failed. Log and continue. */
131 printlog(LOG_WARN, "recv failed to read full message.\n");
134 memset(&sender, 0, sizeof(remote_node_t));
135 sender.addr = fromaddr.sin_addr.s_addr;
136 sender.port = fromaddr.sin_port;
138 message_to_hbo(&msg);
140 assert(msg.magic == MAGIC_MSG);
143 printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n",
144 msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t));
146 pthread_testcancel();
148 pthread_rwlock_rdlock(&limiter.limiter_lock);
150 ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id,
151 sizeof(msg.ident_id));
154 printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n");
155 pthread_rwlock_unlock(&limiter.limiter_lock);
159 pthread_mutex_lock(&ident->comm.lock);
161 remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t));
163 if (remote == NULL) {
164 printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n");
165 pthread_mutex_unlock(&ident->comm.lock);
166 pthread_rwlock_unlock(&limiter.limiter_lock);
170 switch (ident->comm.comm_fabric) {
172 /* Use the message's value to be our new GRDrate/FPSweight for the
173 * message's sender. */
174 remote->rate = msg.value;
176 /* Reset the AWOL counter to zero since we received an update. */
182 if (msg.type == ACK) {
183 if (msg.seqno == remote->outgoing.next_seqno - 1) {
186 /* Ack for most recent message. Clear saved state. */
187 remote->outgoing.first_seqno = remote->outgoing.next_seqno;
188 remote->outgoing.saved_value = 0;
189 remote->outgoing.saved_weight = 0;
191 for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
192 if (ident->comm.retrys[i] >= 0 &&
193 remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
194 //printf("clearing spot %d, it was %d\n", i, ident->retrys[i]);
195 ident->comm.retrys[i] = -2;
199 /* Ignore ack if it isn't for most recent message. */
201 if (msg.min_seqno > remote->incoming.seen_seqno) {
202 /* Entirely new information */
203 remote->incoming.seen_seqno = msg.seqno;
204 remote->incoming.saved_value = msg.value;
205 remote->incoming.saved_weight = msg.weight;
206 ident->comm.gossip.value += msg.value;
207 ident->comm.gossip.weight += msg.weight;
208 send_ack(ident, remote, msg.seqno);
209 } else if (msg.seqno > remote->incoming.seen_seqno) {
210 /* Only some of the message is old news. */
211 double diff_value = msg.value - remote->incoming.saved_value;
212 double diff_weight = msg.weight - remote->incoming.saved_weight;
214 remote->incoming.seen_seqno = msg.seqno;
215 remote->incoming.saved_value = msg.value;
216 remote->incoming.saved_weight = msg.weight;
218 ident->comm.gossip.value += diff_value;
219 ident->comm.gossip.weight += diff_weight;
220 send_ack(ident, remote, msg.seqno);
222 /* The entire message is old news. (Duplicate). */
230 printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
234 pthread_mutex_unlock(&ident->comm.lock);
235 pthread_rwlock_unlock(&limiter.limiter_lock);
239 static void limiter_accept(comm_limiter_t *limiter) {
241 struct sockaddr_in fromaddr;
242 socklen_t fromlen = sizeof(fromaddr);
243 remote_node_t sender;
244 remote_limiter_t *remote;
247 ident_handle *handle = NULL;
249 sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
253 memset(&hello, 0, sizeof(hello_t));
254 result = recv(sock, &hello, sizeof(hello_t), 0);
258 return; /* Failure - ignore it. */
261 assert(result == sizeof(hello_t));
263 hello_to_hbo(&hello);
265 assert(hello.magic == MAGIC_HELLO);
267 memset(&sender, 0, sizeof(remote_node_t));
268 sender.addr = fromaddr.sin_addr.s_addr;
269 sender.port = ntohs(hello.port);
271 pthread_testcancel();
273 pthread_rwlock_rdlock(&limiter->rwlock);
275 handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
277 if (handle == NULL) {
278 printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
279 pthread_rwlock_unlock(&limiter->rwlock);
283 ident = limiter->identities[*handle];
284 assert(ident != NULL);
286 pthread_mutex_lock(&ident->lock);
288 remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
290 if (remote == NULL) {
291 printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
292 pthread_mutex_unlock(&ident->lock);
293 pthread_rwlock_unlock(&limiter->rwlock);
298 if (is_connected(remote)) {
299 /* We are still connected, don't need the new socket. */
301 pthread_mutex_unlock(&ident->lock);
302 pthread_rwlock_unlock(&limiter->rwlock);
306 /* We weren't connected, but we are now... */
307 remote->socket = sock;
308 printf("Got connection on: %d\n", sock);
309 FD_SET(sock, &ident->fds);
311 pthread_mutex_unlock(&ident->lock);
312 pthread_rwlock_unlock(&limiter->rwlock);
315 static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
319 memset(&msg, 0, sizeof(message_t));
321 result = recv(sock, &msg, sizeof(message_t), 0);
324 pthread_rwlock_rdlock(limiter_rwlock);
325 pthread_mutex_lock(&ident->lock);
326 FD_CLR(sock, &ident->fds);
328 pthread_mutex_unlock(&ident->lock);
329 pthread_rwlock_unlock(limiter_rwlock);
333 assert(result == sizeof(message_t));
335 message_to_hbo(&msg);
336 assert(msg.magic == MAGIC_MSG);
338 pthread_rwlock_rdlock(limiter_rwlock);
339 pthread_mutex_lock(&ident->lock);
341 switch (ident->comm_fabric) {
343 ident->gossip.value += msg.value;
344 ident->gossip.weight += msg.weight;
349 assert(1 == 0); /* This case shouldn't happen. Punt for now... */
352 pthread_mutex_unlock(&ident->lock);
353 pthread_rwlock_unlock(limiter_rwlock);
356 static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
357 int select_result, i;
359 struct timeval timeout;
365 pthread_rwlock_rdlock(limiter_rwlock);
366 pthread_mutex_lock(&ident->lock);
367 memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
368 pthread_mutex_unlock(&ident->lock);
369 pthread_rwlock_unlock(limiter_rwlock);
371 /* mask interrupt signals for this thread? */
373 select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
375 assert(select_result >= 0);
377 if (select_result == 0)
378 return; /* Timed out */
380 for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
381 if (FD_ISSET(i, &fds_copy)) {
382 read_tcp_message(ident, limiter_rwlock, i);
389 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
391 remote_limiter_t *remote;
393 struct sockaddr_in toaddr;
395 memset(&toaddr, 0, sizeof(struct sockaddr_in));
396 toaddr.sin_family = AF_INET;
398 memset(&msg, 0, sizeof(message_t));
399 msg.magic = MAGIC_MSG;
401 msg.value = comm->local_rate;
402 /* Do we want seqnos for mesh? We can get by without them. */
404 message_to_nbo(&msg);
406 /* Iterate though and send update to all remote limiters in our identity. */
407 map_reset_iterate(comm->remote_node_map);
408 while ((remote = map_next(comm->remote_node_map))) {
409 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
410 toaddr.sin_port = remote->port;
411 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
412 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
414 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
422 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
425 remote_limiter_t *remote;
426 struct sockaddr_in toaddr;
427 double msg_value, msg_weight;
429 memset(&toaddr, 0, sizeof(struct sockaddr_in));
430 toaddr.sin_family = AF_INET;
432 msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
433 msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
435 for (i = 0; i < comm->gossip.gossip_branch; ++i) {
438 if (comm->retrys[i] >= 0) {
439 remote = &comm->remote_limiters[comm->retrys[i]];
440 targetid = comm->retrys[i];
441 //printf("%d:d:%d, ", i, comm->retrys[i]);
445 while (targetid == -2) {
446 targetid = myrand() % comm->remote_node_count;
448 for (j = 0; j < comm->gossip.gossip_branch; ++j) {
449 if (targetid == comm->retrys[j]) {
456 remote = &comm->remote_limiters[targetid];
457 //printf("%d:r:%d, ", i, targetid);
460 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
461 toaddr.sin_port = remote->port;
463 memset(&msg, 0, sizeof(message_t));
464 msg.magic = MAGIC_MSG;
466 msg.value = msg_value + remote->outgoing.saved_value;
467 msg.weight = msg_weight + remote->outgoing.saved_weight;
468 msg.seqno = remote->outgoing.next_seqno;
469 msg.min_seqno = remote->outgoing.first_seqno;
472 remote->outgoing.next_seqno++;
473 remote->outgoing.saved_value += msg_value;
474 remote->outgoing.saved_weight += msg_weight;
476 message_to_nbo(&msg);
478 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
479 printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
484 comm->retrys[i] = targetid;
488 comm->gossip.value = msg_value;
489 comm->gossip.weight = msg_weight;
495 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
496 int i, targetid, sock;
500 memset(&msg, 0, sizeof(message_t));
501 msg.magic = MAGIC_MSG;
502 msg.ident_id = ident->ident_id;
503 msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1);
504 msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1);
506 message_to_nbo(&msg);
508 for (i = 0; i < ident->gossip.gossip_branch; ++i) {
509 targetid = myrand() % ident->remote_node_count;
510 sock = ident->remote_limiters[targetid].socket;
512 result = send(sock, &msg, sizeof(message_t), 0);
515 FD_CLR(sock, &ident->fds);
520 assert(result == sizeof(message_t));
523 ident->gossip.value /= (ident->gossip.gossip_branch + 1);
524 ident->gossip.weight /= (ident->gossip.gossip_branch + 1);
531 void *limiter_accept_thread(void *limiter) {
532 sigset_t signal_mask;
534 sigemptyset(&signal_mask);
535 sigaddset(&signal_mask, SIGHUP);
536 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
538 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
539 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
541 limiter_accept((comm_limiter_t *) limiter);
545 void *ident_receive_thread(void *recv_args) {
547 struct recv_thread_args *args = (struct recv_thread_args *) recv_args;
548 comm_ident_t *ident = args->ident;
549 pthread_rwlock_t *lock = args->lock;
550 uint16_t port = args->port;
551 struct sockaddr_in addr;
552 socklen_t addrlen = sizeof(addr);
558 memset(&hello, 0, sizeof(hello_t));
560 /*Try to connect to all remote nodes if they aren't already connected.*/
561 pthread_rwlock_rdlock(lock);
562 pthread_mutex_lock(&ident->lock);
564 hello.magic = MAGIC_HELLO;
565 hello.ident_id = ident->ident_id;
566 hello.port = ntohs(port);
568 hello_to_nbo(&hello);
570 for (i = 0; i < ident->remote_node_count; ++i) {
571 if (is_connected(&ident->remote_limiters[i]))
572 continue; /* Ignore if it's already connected */
574 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
582 memset(&addr, 0, sizeof(struct sockaddr_in));
583 addr.sin_family = AF_INET;
584 addr.sin_port = ident->remote_limiters[i].port;
585 addr.sin_addr.s_addr = ident->remote_limiters[i].addr;
587 result = connect(sock, (struct sockaddr *) &addr, addrlen);
593 result = send(sock, &hello, sizeof(hello_t), 0);
599 assert(result == sizeof(hello_t));
601 ident->remote_limiters[i].socket = sock;
602 printf("Connected on socket: %d\n", sock);
603 FD_SET(sock, &ident->fds);
606 pthread_rwlock_unlock(lock);
607 pthread_mutex_unlock(&ident->lock);
609 ident_receive(ident, lock);