1 /* See the DRL-LICENSE file for this file's software license. */
3 #define _XOPEN_SOURCE 600
9 /* Socket functions. */
10 #include <sys/types.h>
11 #include <sys/socket.h>
13 /* Byte ordering and address structures. */
14 #include <arpa/inet.h>
19 /* close() & usleep */
22 /* Mutex lock/unlock. */
28 /* select() w/ timeout */
29 #include <sys/select.h>
35 /* sigaddset(), sigemptyset(), SIGHUP, etc. */
38 /* DRL data structures. */
39 #include "raterouter.h"
40 #include "ratetypes.h"
41 #include "drl_state.h"
42 #include "peer_comm.h"
45 #define NULL_PEER (-2)
46 #define MESH_REMOTE_AWOL_THRESHOLD (5)
47 #define GOSSIP_REMOTE_AWOL_THRESHOLD (5)
49 /* From ulogd_DRL.c */
50 extern int do_partition;
51 extern int partition_set;
53 extern limiter_t limiter;
55 void message_to_hbo(message_t *msg) {
56 msg->magic = ntohl(msg->magic);
57 msg->ident_id = ntohl(msg->ident_id);
58 /* value is a double */
59 /* weight is a double */
60 msg->seqno = ntohl(msg->seqno);
61 msg->min_seqno = ntohl(msg->min_seqno);
62 msg->type = ntohs(msg->type);
63 /* ping_source, ping_port, check_target, and check_port stay in nbo. */
64 msg->checkack_value = ntohl(msg->checkack_value);
65 msg->update_present = ntohl(msg->update_present);
66 msg->reachability = ntohl(msg->reachability);
67 msg->incarnation = ntohl(msg->incarnation);
68 /* node has two fields, both stay in nbo. */
69 msg->view = ntohl(msg->view);
72 void message_to_nbo(message_t *msg) {
73 msg->magic = htonl(msg->magic);
74 msg->ident_id = htonl(msg->ident_id);
75 /* value is a double */
76 /* weight is a double */
77 msg->seqno = htonl(msg->seqno);
78 msg->min_seqno = htonl(msg->min_seqno);
79 msg->type = htons(msg->type);
80 /* ping_source, ping_port, check_target, and check_port already in nbo. */
81 msg->checkack_value = htonl(msg->checkack_value);
82 msg->update_present = htonl(msg->update_present);
83 msg->reachability = htonl(msg->reachability);
84 msg->incarnation = htonl(msg->incarnation);
85 /* node has two fields, both already in nbo. */
86 msg->view = htonl(msg->view);
89 int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view) {
92 struct sockaddr_in toaddr;
94 memset(&toaddr, 0, sizeof(struct sockaddr_in));
95 toaddr.sin_family = AF_INET;
97 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
98 toaddr.sin_port = remote->port;
100 memset(&msg, 0, sizeof(msg));
101 msg.magic = MAGIC_MSG;
107 message_to_nbo(&msg);
109 if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
110 printlog(LOG_WARN, "send_ack: sento failed.\n");
117 void limiter_receive() {
118 struct sockaddr_in fromaddr;
119 remote_node_t sender;
120 socklen_t fromlen = sizeof(fromaddr);
121 identity_t *ident = NULL;
122 remote_limiter_t *remote = NULL;
125 if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) {
126 /* recv failed. Log and continue. */
127 printlog(LOG_WARN, "recv failed to read full message.\n");
130 memset(&sender, 0, sizeof(remote_node_t));
131 sender.addr = fromaddr.sin_addr.s_addr;
132 sender.port = fromaddr.sin_port;
134 message_to_hbo(&msg);
136 assert(msg.magic == MAGIC_MSG);
139 printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n",
140 msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t));
142 pthread_testcancel();
144 pthread_rwlock_rdlock(&limiter.limiter_lock);
146 ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id,
147 sizeof(msg.ident_id));
150 printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n");
151 pthread_rwlock_unlock(&limiter.limiter_lock);
155 pthread_mutex_lock(&ident->comm.lock);
157 remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t));
159 if (remote == NULL) {
160 printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n");
161 pthread_mutex_unlock(&ident->comm.lock);
162 pthread_rwlock_unlock(&limiter.limiter_lock);
166 /* Pass the message to the comm's recv function, which is responsible for
167 * processing its contents. */
168 ident->comm.recv_function(&ident->comm, ident->id, limiter.udp_socket, remote, &msg);
170 pthread_mutex_unlock(&ident->comm.lock);
171 pthread_rwlock_unlock(&limiter.limiter_lock);
174 int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
175 /* Use the message's value to be our new GRDrate/FPSweight for the
176 * message's sender. */
177 remote->rate = msg->value;
179 /* Reset the AWOL counter to zero since we received an update. */
181 remote->reachability = REACHABLE;
186 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
188 remote_limiter_t *remote;
190 struct sockaddr_in toaddr;
193 #ifdef ALLOW_PARTITION
195 int partition_count = 0;
201 memset(&toaddr, 0, sizeof(struct sockaddr_in));
202 toaddr.sin_family = AF_INET;
204 memset(&msg, 0, sizeof(message_t));
205 msg.magic = MAGIC_MSG;
207 msg.value = comm->local_rate;
208 msg.view = comm->gossip.view;
209 /* Do we want seqnos for mesh? We can get by without them. */
211 message_to_nbo(&msg);
213 /* Iterate though and send update to all remote limiters in our identity. */
214 for (i = 0; i < comm->remote_node_count; ++i) {
215 remote = &comm->remote_limiters[i];
217 /* Increase this counter. For mesh, it represents the number of messages we have sent to
218 * this remote limiter without having heard from it. This is reset to 0 when we receive
219 * an update from this peer. */
221 if (remote->awol > MESH_REMOTE_AWOL_THRESHOLD) {
222 remote->reachability = UNREACHABLE;
225 #ifdef ALLOW_PARTITION
228 printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n",
229 partition_set, partition_count, partition_set & (1 << partition_count));
230 /* If the partition count bit isn't high in the set, don't actually send anything. */
231 if ((partition_set & (1 << partition_count)) == 0) {
232 dest.s_addr = ntohl(remote->addr);
233 strcpy(dest_ip, inet_ntoa(dest));
235 printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip);
237 partition_count += 1;
242 partition_count += 1;
246 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
247 toaddr.sin_port = remote->port;
248 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
249 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
251 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
259 int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
260 if (msg->type == ACK) {
261 /* If ACK was received then reset the awol count */
262 if (msg->seqno == remote->outgoing.next_seqno - 1) {
263 /* Ack for most recent message. Clear saved state. */
264 remote->outgoing.first_seqno = remote->outgoing.next_seqno;
265 remote->outgoing.saved_value = 0;
266 remote->outgoing.saved_weight = 0;
270 /* Ignore ack if it isn't for most recent message. */
271 } else if (msg->type == MSG) {
272 if (msg->min_seqno > remote->incoming.seen_seqno) {
273 /* Entirely new information */
274 remote->incoming.seen_seqno = msg->seqno;
275 remote->incoming.saved_value = msg->value;
276 remote->incoming.saved_weight = msg->weight;
277 comm->gossip.value += msg->value;
278 comm->gossip.weight += msg->weight;
279 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
282 else if (msg->seqno > remote->incoming.seen_seqno) {
283 /* Only some of the message is old news. */
284 double diff_value = msg->value - remote->incoming.saved_value;
285 double diff_weight = msg->weight - remote->incoming.saved_weight;
287 remote->incoming.seen_seqno = msg->seqno;
288 remote->incoming.saved_value = msg->value;
289 remote->incoming.saved_weight = msg->weight;
291 comm->gossip.value += diff_value;
292 comm->gossip.weight += diff_weight;
293 send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
297 /* The entire message is old news. (Duplicate). */
305 int find_gossip_target(comm_t *comm) {
306 int target = NULL_PEER;
309 if (comm->shuffle_index < comm->remote_node_count) {
310 target = comm->indices[comm->shuffle_index];
311 printlog(LOG_DEBUG,"GOSSIP: found index %d.\n", target);
312 comm->shuffle_index++;
315 // shuffle the remote_limiters array
316 printlog(LOG_DEBUG, "GOSSIP: shuffling the array.\n");
317 for ( k = 0; k < comm->remote_node_count; k++) {
318 uint32_t l = myrand() % comm->remote_node_count;
320 t = comm->indices[l];
321 comm->indices[l] = comm->indices[k];
322 comm->indices[k] = t;
324 comm->shuffle_index = 0;
325 target = comm->indices[comm->shuffle_index];
326 printlog(LOG_DEBUG,"GOSSIP: found index after spilling over %d.\n", target);
327 comm->shuffle_index++;
332 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
336 remote_limiter_t *remote;
337 struct sockaddr_in toaddr;
338 double msg_value, msg_weight;
340 /* This is the factor for the portion of value/weight to keep locally.
341 * Normally this is 1, meaning that we retain the same amount of value/weight
342 * that was sent to the peers. In the case of not being able to send to a
343 * peer though, we increment this to reclaim the value/weight locally. */
344 int message_portion = 1;
346 memset(&toaddr, 0, sizeof(struct sockaddr_in));
347 toaddr.sin_family = AF_INET;
349 msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
350 msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
352 for (i = 0; i < comm->gossip.gossip_branch; ++i) {
353 int targetid = NULL_PEER;
357 printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
359 /* If there are any peers with unacked messages, select them first. */
360 while (retry_index < comm->remote_node_count) {
361 if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
362 targetid = retry_index;
363 printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
369 while (targetid == NULL_PEER && rand_count < 10) {
370 /* Select a recipient from a randomly-shuffled array. */
371 targetid = find_gossip_target(comm);
373 assert(targetid != NULL_PEER);
375 /* Don't select an already-used index. */
376 for (j = 0; j < i; ++j) {
377 if (targetid == comm->selected[j]) {
378 printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]);
379 targetid = NULL_PEER;
384 /* Don't select an unreachable peer or one that is not in our view. */
385 if (targetid != NULL_PEER) {
386 if (comm->remote_limiters[targetid].reachability != REACHABLE ||
387 comm->remote_limiters[targetid].view != comm->gossip.view ||
388 comm->remote_limiters[targetid].view_confidence != IN) {
389 printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d, reachability is %d, remote view is %d (confidence:%d), my view is %d\n",
390 targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].view,
391 comm->remote_limiters[targetid].view_confidence, comm->gossip.view);
392 targetid = NULL_PEER;
399 if (targetid == NULL_PEER) {
400 /* Couldn't find a suitable peer to send to... */
401 message_portion += 1;
402 printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
405 printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
408 remote = &comm->remote_limiters[targetid];
409 comm->selected[i] = targetid;
411 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
412 toaddr.sin_port = remote->port;
414 memset(&msg, 0, sizeof(message_t));
415 msg.magic = MAGIC_MSG;
417 msg.value = msg_value + remote->outgoing.saved_value;
418 msg.weight = msg_weight + remote->outgoing.saved_weight;
419 msg.seqno = remote->outgoing.next_seqno;
420 msg.min_seqno = remote->outgoing.first_seqno;
422 msg.view = comm->gossip.view;
424 remote->outgoing.next_seqno++;
425 remote->outgoing.saved_value += msg_value;
426 remote->outgoing.saved_weight += msg_weight;
427 /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
431 #ifdef ALLOW_PARTITION
433 if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
434 printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
440 message_to_nbo(&msg);
442 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
443 printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
448 comm->gossip.value = msg_value * message_portion;
449 comm->gossip.weight = msg_weight * message_portion;
457 static void hello_to_hbo(hello_t *hello) {
458 hello->magic = ntohl(hello->magic);
459 hello->ident_id = ntohl(hello->ident_id);
460 hello->port = ntohs(hello->port);
463 static void hello_to_nbo(hello_t *hello) {
464 hello->magic = htonl(hello->magic);
465 hello->ident_id = htonl(hello->ident_id);
466 hello->port = htons(hello->port);
469 static int is_connected(remote_limiter_t *remote) {
470 struct sockaddr_in addr;
471 socklen_t addrlen = sizeof(addr);
473 if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
479 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
480 int i, targetid, sock;
484 memset(&msg, 0, sizeof(message_t));
485 msg.magic = MAGIC_MSG;
486 msg.ident_id = ident->ident_id;
487 msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1);
488 msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1);
490 message_to_nbo(&msg);
492 for (i = 0; i < ident->gossip.gossip_branch; ++i) {
493 targetid = myrand() % ident->remote_node_count;
494 sock = ident->remote_limiters[targetid].socket;
496 result = send(sock, &msg, sizeof(message_t), 0);
499 FD_CLR(sock, &ident->fds);
504 assert(result == sizeof(message_t));
507 ident->gossip.value /= (ident->gossip.gossip_branch + 1);
508 ident->gossip.weight /= (ident->gossip.gossip_branch + 1);
513 void *limiter_accept_thread(void *limiter) {
514 sigset_t signal_mask;
516 sigemptyset(&signal_mask);
517 sigaddset(&signal_mask, SIGHUP);
518 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
520 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
521 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
523 limiter_accept((comm_limiter_t *) limiter);
527 void *ident_receive_thread(void *recv_args) {
529 struct recv_thread_args *args = (struct recv_thread_args *) recv_args;
530 comm_ident_t *ident = args->ident;
531 pthread_rwlock_t *lock = args->lock;
532 uint16_t port = args->port;
533 struct sockaddr_in addr;
534 socklen_t addrlen = sizeof(addr);
540 memset(&hello, 0, sizeof(hello_t));
542 /*Try to connect to all remote nodes if they aren't already connected.*/
543 pthread_rwlock_rdlock(lock);
544 pthread_mutex_lock(&ident->lock);
546 hello.magic = MAGIC_HELLO;
547 hello.ident_id = ident->ident_id;
548 hello.port = ntohs(port);
550 hello_to_nbo(&hello);
552 for (i = 0; i < ident->remote_node_count; ++i) {
553 if (is_connected(&ident->remote_limiters[i]))
554 continue; /* Ignore if it's already connected */
556 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
564 memset(&addr, 0, sizeof(struct sockaddr_in));
565 addr.sin_family = AF_INET;
566 addr.sin_port = ident->remote_limiters[i].port;
567 addr.sin_addr.s_addr = ident->remote_limiters[i].addr;
569 result = connect(sock, (struct sockaddr *) &addr, addrlen);
575 result = send(sock, &hello, sizeof(hello_t), 0);
581 assert(result == sizeof(hello_t));
583 ident->remote_limiters[i].socket = sock;
584 printf("Connected on socket: %d\n", sock);
585 FD_SET(sock, &ident->fds);
588 pthread_rwlock_unlock(lock);
589 pthread_mutex_unlock(&ident->lock);
591 ident_receive(ident, lock);
596 static void limiter_accept(comm_limiter_t *limiter) {
598 struct sockaddr_in fromaddr;
599 socklen_t fromlen = sizeof(fromaddr);
600 remote_node_t sender;
601 remote_limiter_t *remote;
604 dent_handle *handle = NULL;
606 sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
610 memset(&hello, 0, sizeof(hello_t));
611 result = recv(sock, &hello, sizeof(hello_t), 0);
615 return; /* Failure - ignore it. */
618 assert(result == sizeof(hello_t));
620 hello_to_hbo(&hello);
622 assert(hello.magic == MAGIC_HELLO);
624 memset(&sender, 0, sizeof(remote_node_t));
625 sender.addr = fromaddr.sin_addr.s_addr;
626 sender.port = ntohs(hello.port);
628 pthread_testcancel();
630 pthread_rwlock_rdlock(&limiter->rwlock);
632 handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
634 if (handle == NULL) {
635 printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
636 pthread_rwlock_unlock(&limiter->rwlock);
640 ident = limiter->identities[*handle];
641 assert(ident != NULL);
643 pthread_mutex_lock(&ident->lock);
645 remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
647 if (remote == NULL) {
648 printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
649 pthread_mutex_unlock(&ident->lock);
650 pthread_rwlock_unlock(&limiter->rwlock);
655 if (is_connected(remote)) {
656 /* We are still connected, don't need the new socket. */
658 pthread_mutex_unlock(&ident->lock);
659 pthread_rwlock_unlock(&limiter->rwlock);
663 /* We weren't connected, but we are now... */
664 remote->socket = sock;
665 printf("Got connection on: %d\n", sock);
666 FD_SET(sock, &ident->fds);
668 pthread_mutex_unlock(&ident->lock);
669 pthread_rwlock_unlock(&limiter->rwlock);
672 static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
676 memset(&msg, 0, sizeof(message_t));
678 result = recv(sock, &msg, sizeof(message_t), 0);
681 pthread_rwlock_rdlock(limiter_rwlock);
682 pthread_mutex_lock(&ident->lock);
683 FD_CLR(sock, &ident->fds);
685 pthread_mutex_unlock(&ident->lock);
686 pthread_rwlock_unlock(limiter_rwlock);
690 assert(result == sizeof(message_t));
692 message_to_hbo(&msg);
693 assert(msg.magic == MAGIC_MSG);
695 pthread_rwlock_rdlock(limiter_rwlock);
696 pthread_mutex_lock(&ident->lock);
698 switch (ident->comm_fabric) {
700 ident->gossip.value += msg.value;
701 ident->gossip.weight += msg.weight;
706 assert(1 == 0); /* This case shouldn't happen. Punt for now... */
709 pthread_mutex_unlock(&ident->lock);
710 pthread_rwlock_unlock(limiter_rwlock);
713 static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
714 int select_result, i;
716 struct timeval timeout;
722 pthread_rwlock_rdlock(limiter_rwlock);
723 pthread_mutex_lock(&ident->lock);
724 memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
725 pthread_mutex_unlock(&ident->lock);
726 pthread_rwlock_unlock(limiter_rwlock);
728 /* mask interrupt signals for this thread? */
730 select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
732 assert(select_result >= 0);
734 if (select_result == 0)
735 return; /* Timed out */
737 for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
738 if (FD_ISSET(i, &fds_copy)) {
739 read_tcp_message(ident, limiter_rwlock, i);