1 /* See the DRL-LICENSE file for this file's software license. */
3 #define _XOPEN_SOURCE 600
9 /* Socket functions. */
10 #include <sys/types.h>
11 #include <sys/socket.h>
13 /* Byte ordering and address structures. */
14 #include <arpa/inet.h>
19 /* close() & usleep */
22 /* Mutex lock/unlock. */
28 /* select() w/ timeout */
29 #include <sys/select.h>
35 /* sigaddset(), sigemptyset(), SIGHUP, etc. */
38 /* DRL data structures. */
39 #include "raterouter.h"
40 #include "ratetypes.h"
41 #include "drl_state.h"
42 #include "peer_comm.h"
45 /* Artifically makes a network partition. */
47 int partition_set = 0xfffffff;
49 extern limiter_t limiter;
51 static const uint32_t MAGIC_MSG = 0x123123;
52 static const uint32_t MAGIC_HELLO = 0x456456;
53 static const uint16_t MSG = 1;
54 static const uint16_t ACK = 2;
56 static void message_to_hbo(message_t *msg) {
57 msg->magic = ntohl(msg->magic);
58 msg->ident_id = ntohl(msg->ident_id);
59 msg->seqno = ntohl(msg->seqno);
60 msg->min_seqno = ntohl(msg->min_seqno);
61 msg->type = ntohs(msg->type);
62 /* value is a double */
63 /* weight is a double */
66 static void message_to_nbo(message_t *msg) {
67 msg->magic = htonl(msg->magic);
68 msg->ident_id = htonl(msg->ident_id);
69 msg->seqno = htonl(msg->seqno);
70 msg->min_seqno = htonl(msg->min_seqno);
71 msg->type = htons(msg->type);
72 /* value is a double */
73 /* weight is a double */
76 static void hello_to_hbo(hello_t *hello) {
77 hello->magic = ntohl(hello->magic);
78 hello->ident_id = ntohl(hello->ident_id);
79 hello->port = ntohs(hello->port);
82 static void hello_to_nbo(hello_t *hello) {
83 hello->magic = htonl(hello->magic);
84 hello->ident_id = htonl(hello->ident_id);
85 hello->port = htons(hello->port);
88 static int is_connected(remote_limiter_t *remote) {
89 struct sockaddr_in addr;
90 socklen_t addrlen = sizeof(addr);
92 if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
98 static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
101 struct sockaddr_in toaddr;
103 memset(&toaddr, 0, sizeof(struct sockaddr_in));
104 toaddr.sin_family = AF_INET;
106 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
107 toaddr.sin_port = remote->port;
109 memset(&msg, 0, sizeof(msg));
110 msg.magic = MAGIC_MSG;
111 msg.ident_id = ident->id;
115 message_to_nbo(&msg);
117 if (sendto(limiter.udp_socket, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
118 printlog(LOG_WARN, "send_ack: sento failed.\n");
125 void limiter_receive() {
126 struct sockaddr_in fromaddr;
127 remote_node_t sender;
128 socklen_t fromlen = sizeof(fromaddr);
129 identity_t *ident = NULL;
130 remote_limiter_t *remote = NULL;
133 if (recvfrom(limiter.udp_socket, &msg, sizeof(msg), MSG_WAITALL, (struct sockaddr *) &fromaddr, (socklen_t *) &fromlen) != sizeof(msg)) {
134 /* recv failed. Log and continue. */
135 printlog(LOG_WARN, "recv failed to read full message.\n");
138 memset(&sender, 0, sizeof(remote_node_t));
139 sender.addr = fromaddr.sin_addr.s_addr;
140 sender.port = fromaddr.sin_port;
142 message_to_hbo(&msg);
144 assert(msg.magic == MAGIC_MSG);
147 printlog(LOG_WARN, "Rcvd (value, weight) : (%f, %f) from ident %d (net order host 0x%x port %d) key size(%d)\n",
148 msg.value, msg.weight, msg.ident_id, sender.addr,sender.port,sizeof(remote_node_t));
150 pthread_testcancel();
152 pthread_rwlock_rdlock(&limiter.limiter_lock);
154 ident = map_search(limiter.stable_instance.ident_map, &msg.ident_id,
155 sizeof(msg.ident_id));
158 printlog(LOG_WARN, "WARN:recvd message for unknown identity.\n");
159 pthread_rwlock_unlock(&limiter.limiter_lock);
163 pthread_mutex_lock(&ident->comm.lock);
165 remote = map_search(ident->comm.remote_node_map, &sender, sizeof(remote_node_t));
167 if (remote == NULL) {
168 printlog(LOG_WARN, "WARN: recvd msg from unknown entity.\n");
169 pthread_mutex_unlock(&ident->comm.lock);
170 pthread_rwlock_unlock(&limiter.limiter_lock);
174 switch (ident->comm.comm_fabric) {
176 /* Use the message's value to be our new GRDrate/FPSweight for the
177 * message's sender. */
178 remote->rate = msg.value;
180 /* Reset the AWOL counter to zero since we received an update. */
186 if (msg.type == ACK) {
187 if (msg.seqno == remote->outgoing.next_seqno - 1) {
190 /* Ack for most recent message. Clear saved state. */
191 remote->outgoing.first_seqno = remote->outgoing.next_seqno;
192 remote->outgoing.saved_value = 0;
193 remote->outgoing.saved_weight = 0;
195 for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
196 if (ident->comm.retrys[i] >= 0 &&
197 remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
198 ident->comm.retrys[i] = -2;
202 /* Ignore ack if it isn't for most recent message. */
204 if (msg.min_seqno > remote->incoming.seen_seqno) {
205 /* Entirely new information */
206 remote->incoming.seen_seqno = msg.seqno;
207 remote->incoming.saved_value = msg.value;
208 remote->incoming.saved_weight = msg.weight;
209 ident->comm.gossip.value += msg.value;
210 ident->comm.gossip.weight += msg.weight;
211 send_ack(ident, remote, msg.seqno);
213 } else if (msg.seqno > remote->incoming.seen_seqno) {
214 /* Only some of the message is old news. */
215 double diff_value = msg.value - remote->incoming.saved_value;
216 double diff_weight = msg.weight - remote->incoming.saved_weight;
218 remote->incoming.seen_seqno = msg.seqno;
219 remote->incoming.saved_value = msg.value;
220 remote->incoming.saved_weight = msg.weight;
222 ident->comm.gossip.value += diff_value;
223 ident->comm.gossip.weight += diff_weight;
224 send_ack(ident, remote, msg.seqno);
227 /* The entire message is old news. (Duplicate). */
235 printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
239 pthread_mutex_unlock(&ident->comm.lock);
240 pthread_rwlock_unlock(&limiter.limiter_lock);
244 static void limiter_accept(comm_limiter_t *limiter) {
246 struct sockaddr_in fromaddr;
247 socklen_t fromlen = sizeof(fromaddr);
248 remote_node_t sender;
249 remote_limiter_t *remote;
252 ident_handle *handle = NULL;
254 sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
258 memset(&hello, 0, sizeof(hello_t));
259 result = recv(sock, &hello, sizeof(hello_t), 0);
263 return; /* Failure - ignore it. */
266 assert(result == sizeof(hello_t));
268 hello_to_hbo(&hello);
270 assert(hello.magic == MAGIC_HELLO);
272 memset(&sender, 0, sizeof(remote_node_t));
273 sender.addr = fromaddr.sin_addr.s_addr;
274 sender.port = ntohs(hello.port);
276 pthread_testcancel();
278 pthread_rwlock_rdlock(&limiter->rwlock);
280 handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
282 if (handle == NULL) {
283 printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
284 pthread_rwlock_unlock(&limiter->rwlock);
288 ident = limiter->identities[*handle];
289 assert(ident != NULL);
291 pthread_mutex_lock(&ident->lock);
293 remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
295 if (remote == NULL) {
296 printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
297 pthread_mutex_unlock(&ident->lock);
298 pthread_rwlock_unlock(&limiter->rwlock);
303 if (is_connected(remote)) {
304 /* We are still connected, don't need the new socket. */
306 pthread_mutex_unlock(&ident->lock);
307 pthread_rwlock_unlock(&limiter->rwlock);
311 /* We weren't connected, but we are now... */
312 remote->socket = sock;
313 printf("Got connection on: %d\n", sock);
314 FD_SET(sock, &ident->fds);
316 pthread_mutex_unlock(&ident->lock);
317 pthread_rwlock_unlock(&limiter->rwlock);
320 static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
324 memset(&msg, 0, sizeof(message_t));
326 result = recv(sock, &msg, sizeof(message_t), 0);
329 pthread_rwlock_rdlock(limiter_rwlock);
330 pthread_mutex_lock(&ident->lock);
331 FD_CLR(sock, &ident->fds);
333 pthread_mutex_unlock(&ident->lock);
334 pthread_rwlock_unlock(limiter_rwlock);
338 assert(result == sizeof(message_t));
340 message_to_hbo(&msg);
341 assert(msg.magic == MAGIC_MSG);
343 pthread_rwlock_rdlock(limiter_rwlock);
344 pthread_mutex_lock(&ident->lock);
346 switch (ident->comm_fabric) {
348 ident->gossip.value += msg.value;
349 ident->gossip.weight += msg.weight;
354 assert(1 == 0); /* This case shouldn't happen. Punt for now... */
357 pthread_mutex_unlock(&ident->lock);
358 pthread_rwlock_unlock(limiter_rwlock);
361 static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
362 int select_result, i;
364 struct timeval timeout;
370 pthread_rwlock_rdlock(limiter_rwlock);
371 pthread_mutex_lock(&ident->lock);
372 memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
373 pthread_mutex_unlock(&ident->lock);
374 pthread_rwlock_unlock(limiter_rwlock);
376 /* mask interrupt signals for this thread? */
378 select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
380 assert(select_result >= 0);
382 if (select_result == 0)
383 return; /* Timed out */
385 for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
386 if (FD_ISSET(i, &fds_copy)) {
387 read_tcp_message(ident, limiter_rwlock, i);
394 #define ALLOW_PARTITION
396 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
398 remote_limiter_t *remote;
400 struct sockaddr_in toaddr;
403 #ifdef ALLOW_PARTITION
405 int partition_count = 0;
411 memset(&toaddr, 0, sizeof(struct sockaddr_in));
412 toaddr.sin_family = AF_INET;
414 memset(&msg, 0, sizeof(message_t));
415 msg.magic = MAGIC_MSG;
417 msg.value = comm->local_rate;
418 /* Do we want seqnos for mesh? We can get by without them. */
420 message_to_nbo(&msg);
422 /* Iterate though and send update to all remote limiters in our identity. */
423 for (i = 0; i < comm->remote_node_count; ++i) {
424 remote = &comm->remote_limiters[i];
426 #ifdef ALLOW_PARTITION
429 printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n",
430 partition_set, partition_count, partition_set & (1 << partition_count));
431 /* If the partition count bit isn't high in the set, don't actually send anything. */
432 if ((partition_set & (1 << partition_count)) == 0) {
433 dest.s_addr = ntohl(remote->addr);
434 strcpy(dest_ip, inet_ntoa(dest));
436 printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip);
438 partition_count += 1;
445 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
446 toaddr.sin_port = remote->port;
447 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
448 printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
450 printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
453 partition_count += 1;
459 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
461 int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
462 int rand_count; //HACK...
464 remote_limiter_t *remote;
465 struct sockaddr_in toaddr;
466 double msg_value, msg_weight;
468 /* This is the factor for the portion of value/weight to keep locally.
469 * Normally this is 1, meaning that we retain the same amount of value/weight
470 * that was sent to the peers. In the case of not being able to send to a
471 * peer though, we increment this to reclaim the value/weight locally. */
472 int message_portion = 1;
474 memset(&toaddr, 0, sizeof(struct sockaddr_in));
475 toaddr.sin_family = AF_INET;
477 msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
478 msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
480 for (i = 0; i < comm->gossip.gossip_branch; ++i) {
483 printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
485 if (comm->retrys[i] >= 0) {
486 remote = &comm->remote_limiters[comm->retrys[i]];
487 targetid = comm->retrys[i];
489 if (remote->awol > awol_threshold) {
490 message_portion += 1;
491 printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]);
492 comm->retrys[i] = -1;
499 while (targetid == -2 && rand_count < 50) {
500 targetid = myrand() % comm->remote_node_count;
503 /* Don't select an already-used index. */
504 for (j = 0; j < comm->gossip.gossip_branch; ++j) {
505 if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) {
506 printlog(LOG_DEBUG, "Gossip: disqualified targetid %d. retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol);
514 /* Couldn't find a suitable peer to send to... */
515 message_portion += 1;
516 printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n");
519 printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid);
522 remote = &comm->remote_limiters[targetid];
525 comm->retrys[i] = targetid;
527 toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
528 toaddr.sin_port = remote->port;
530 memset(&msg, 0, sizeof(message_t));
531 msg.magic = MAGIC_MSG;
533 msg.value = msg_value + remote->outgoing.saved_value;
534 msg.weight = msg_weight + remote->outgoing.saved_weight;
535 msg.seqno = remote->outgoing.next_seqno;
536 msg.min_seqno = remote->outgoing.first_seqno;
539 remote->outgoing.next_seqno++;
540 remote->outgoing.saved_value += msg_value;
541 remote->outgoing.saved_weight += msg_weight;
543 #ifdef ALLOW_PARTITION
545 if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
546 printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid);
552 message_to_nbo(&msg);
554 if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
555 printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
561 comm->gossip.value = msg_value * message_portion;
562 comm->gossip.weight = msg_weight * message_portion;
568 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
569 int i, targetid, sock;
573 memset(&msg, 0, sizeof(message_t));
574 msg.magic = MAGIC_MSG;
575 msg.ident_id = ident->ident_id;
576 msg.value = ident->gossip.value / (ident->gossip.gossip_branch + 1);
577 msg.weight = ident->gossip.weight / (ident->gossip.gossip_branch + 1);
579 message_to_nbo(&msg);
581 for (i = 0; i < ident->gossip.gossip_branch; ++i) {
582 targetid = myrand() % ident->remote_node_count;
583 sock = ident->remote_limiters[targetid].socket;
585 result = send(sock, &msg, sizeof(message_t), 0);
588 FD_CLR(sock, &ident->fds);
593 assert(result == sizeof(message_t));
596 ident->gossip.value /= (ident->gossip.gossip_branch + 1);
597 ident->gossip.weight /= (ident->gossip.gossip_branch + 1);
604 void *limiter_accept_thread(void *limiter) {
605 sigset_t signal_mask;
607 sigemptyset(&signal_mask);
608 sigaddset(&signal_mask, SIGHUP);
609 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
611 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
612 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
614 limiter_accept((comm_limiter_t *) limiter);
618 void *ident_receive_thread(void *recv_args) {
620 struct recv_thread_args *args = (struct recv_thread_args *) recv_args;
621 comm_ident_t *ident = args->ident;
622 pthread_rwlock_t *lock = args->lock;
623 uint16_t port = args->port;
624 struct sockaddr_in addr;
625 socklen_t addrlen = sizeof(addr);
631 memset(&hello, 0, sizeof(hello_t));
633 /*Try to connect to all remote nodes if they aren't already connected.*/
634 pthread_rwlock_rdlock(lock);
635 pthread_mutex_lock(&ident->lock);
637 hello.magic = MAGIC_HELLO;
638 hello.ident_id = ident->ident_id;
639 hello.port = ntohs(port);
641 hello_to_nbo(&hello);
643 for (i = 0; i < ident->remote_node_count; ++i) {
644 if (is_connected(&ident->remote_limiters[i]))
645 continue; /* Ignore if it's already connected */
647 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
655 memset(&addr, 0, sizeof(struct sockaddr_in));
656 addr.sin_family = AF_INET;
657 addr.sin_port = ident->remote_limiters[i].port;
658 addr.sin_addr.s_addr = ident->remote_limiters[i].addr;
660 result = connect(sock, (struct sockaddr *) &addr, addrlen);
666 result = send(sock, &hello, sizeof(hello_t), 0);
672 assert(result == sizeof(hello_t));
674 ident->remote_limiters[i].socket = sock;
675 printf("Connected on socket: %d\n", sock);
676 FD_SET(sock, &ident->fds);
679 pthread_rwlock_unlock(lock);
680 pthread_mutex_unlock(&ident->lock);
682 ident_receive(ident, lock);