1 /* See the DRL-LICENSE file for this file's software license. */
3 /** Allows us to use pthread_rwlocks. */
4 #define _XOPEN_SOURCE 600
12 /* Socket functions. */
13 #include <sys/socket.h>
22 #include <sys/select.h>
26 #include "raterouter.h"
27 #include "ratetypes.h"
28 #include "drl_state.h"
29 #include "peer_comm.h"
32 extern limiter_t limiter;
34 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
37 memset(comm, 0, sizeof(comm_t));
39 comm->comm_fabric = config->commfabric;
40 comm->transport_proto = UDP;
41 comm->remote_node_count = config->peer_count;
42 comm->gossip.gossip_branch = config->branch;
43 comm->gossip.weight = 1.0;
45 pthread_mutex_init(&comm->lock, NULL);
47 /* Set send function. */
48 switch (config->commfabric) {
50 comm->send_function = send_udp_mesh;
53 comm->send_function = send_udp_gossip;
57 comm->remote_node_map = allocate_map();
58 if (comm->remote_node_map == NULL) {
59 pthread_mutex_destroy(&comm->lock);
63 /* Allocate remote_limiters array and fill it in. Add remotes to map. */
64 comm->remote_limiters =
65 malloc(config->peer_count * sizeof(remote_limiter_t));
67 if (comm->remote_limiters == NULL) {
68 pthread_mutex_destroy(&comm->lock);
69 free_map(comm->remote_node_map, 0);
73 memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
75 for (i = 0; i < config->peer_count; ++i) {
76 comm->remote_limiters[i].addr = remote_nodes[i].addr;
77 comm->remote_limiters[i].port = remote_nodes[i].port;
78 comm->remote_limiters[i].outgoing.next_seqno = 1;
79 map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
80 sizeof(remote_node_t), &comm->remote_limiters[i]);
83 /* Allocate and initialize retrys. */
84 comm->retrys = malloc(config->branch * sizeof(int));
85 if (comm->retrys == NULL) {
86 pthread_mutex_destroy(&comm->lock);
87 free_map(comm->remote_node_map, 0);
88 free(comm->remote_limiters);
92 for (i = 0; i < config->branch; ++i) {
99 void free_comm(comm_t *comm) {
101 if (comm->remote_limiters) {
102 free(comm->remote_limiters);
105 if (comm->remote_nodes) {
106 free(comm->remote_nodes);
109 if (comm->remote_node_map) {
110 free_map(comm->remote_node_map, 0);
113 pthread_mutex_destroy(&comm->lock);
121 int read_comm(comm_t *comm, double *aggregate, double decayto) {
122 remote_limiter_t *remote;
124 pthread_mutex_lock(&comm->lock);
125 if (comm->comm_fabric == COMM_MESH) {
127 map_reset_iterate(comm->remote_node_map);
128 while ((remote = map_next(comm->remote_node_map))) {
129 /* remote->rate corresponds to the rate (GRD) or weight (FPS)
130 * in generated by the peer remote. */
131 *aggregate += remote->rate;
133 /* If we continue to read it without having heard an update,
134 * we start to make the peer's value approach decayto, getting
135 * half of the way there each time. */
136 if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
137 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
138 remote->rate += ((decayto - remote->rate) / 2);
143 *aggregate += comm->local_rate;
144 } else if (comm->comm_fabric == COMM_GOSSIP) {
146 value = (comm->gossip.value / comm->gossip.weight);
147 value *= (comm->remote_node_count + 1);
149 /* Keep around the last value so that we don't stupidly pick 0 when
150 * we're negative. If we pick 0, it looks to the limiter like it
151 * has free reign and it will take 100% of the rate allocation for
154 //*aggregate = comm->gossip.last_nonzero;
156 //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
159 comm->gossip.last_nonzero = *aggregate;
160 //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
163 printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
165 pthread_mutex_unlock(&comm->lock);
168 pthread_mutex_unlock(&comm->lock);
170 //printf("read: %.3f\n", *aggregate);
175 int write_local_value(comm_t *comm, const double value) {
176 pthread_mutex_lock(&comm->lock);
177 if (comm->comm_fabric == COMM_MESH) {
178 comm->last_local_rate = comm->local_rate;
179 comm->local_rate = value;
180 comm->rate_change = comm->local_rate - comm->last_local_rate;
181 } else if (comm->comm_fabric == COMM_GOSSIP) {
182 comm->last_local_rate = comm->local_rate;
183 comm->local_rate = value;
184 comm->rate_change = comm->local_rate - comm->last_local_rate;
185 /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
186 /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
187 comm->gossip.value += comm->rate_change;
190 printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
192 pthread_mutex_unlock(&comm->lock);
195 pthread_mutex_unlock(&comm->lock);
200 int send_update(comm_t *comm, uint32_t id) {
203 pthread_mutex_lock(&comm->lock);
205 result = comm->send_function(comm, id, limiter.udp_socket);
207 pthread_mutex_unlock(&comm->lock);
212 void *limiter_receive_thread(void *unused) {
213 sigset_t signal_mask;
215 sigemptyset(&signal_mask);
216 sigaddset(&signal_mask, SIGHUP);
217 sigaddset(&signal_mask, SIGUSR1);
218 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
220 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
221 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);