1 /* See the DRL-LICENSE file for this file's software license. */
3 /** Allows us to use pthread_rwlocks. */
4 #define _XOPEN_SOURCE 600
12 /* Socket functions. */
13 #include <sys/socket.h>
22 #include <sys/select.h>
26 #include "raterouter.h"
27 #include "ratetypes.h"
28 #include "drl_state.h"
29 #include "peer_comm.h"
32 extern limiter_t limiter;
34 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
37 memset(comm, 0, sizeof(comm_t));
39 comm->comm_fabric = config->commfabric;
40 comm->transport_proto = UDP;
41 comm->remote_node_count = config->peer_count;
42 comm->gossip.gossip_branch = config->branch;
43 comm->gossip.weight = 1.0;
45 pthread_mutex_init(&comm->lock, NULL);
47 /* Set send function. */
48 switch (config->commfabric) {
50 comm->send_function = send_udp_mesh;
53 comm->send_function = send_udp_gossip;
57 comm->remote_node_map = allocate_map();
58 if (comm->remote_node_map == NULL) {
59 pthread_mutex_destroy(&comm->lock);
63 /* Allocate remote_limiters array and fill it in. Add remotes to map. */
64 comm->remote_limiters =
65 malloc(config->peer_count * sizeof(remote_limiter_t));
67 if (comm->remote_limiters == NULL) {
68 pthread_mutex_destroy(&comm->lock);
69 free_map(comm->remote_node_map, 0);
73 memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
75 for (i = 0; i < config->peer_count; ++i) {
76 comm->remote_limiters[i].addr = remote_nodes[i].addr;
77 comm->remote_limiters[i].port = remote_nodes[i].port;
78 comm->remote_limiters[i].outgoing.next_seqno = 1;
79 map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
80 sizeof(remote_node_t), &comm->remote_limiters[i]);
83 /* Allocate and initialize retrys. */
84 comm->retrys = malloc(config->branch * sizeof(int));
85 if (comm->retrys == NULL) {
86 pthread_mutex_destroy(&comm->lock);
87 free_map(comm->remote_node_map, 0);
88 free(comm->remote_limiters);
92 for (i = 0; i < config->branch; ++i) {
99 void free_comm(comm_t *comm) {
101 if (comm->remote_limiters) {
102 free(comm->remote_limiters);
105 if (comm->remote_nodes) {
106 free(comm->remote_nodes);
109 if (comm->remote_node_map) {
110 free_map(comm->remote_node_map, 0);
113 pthread_mutex_destroy(&comm->lock);
121 int read_comm(comm_t *comm, double *aggregate, double decayto) {
122 remote_limiter_t *remote;
124 pthread_mutex_lock(&comm->lock);
125 if (comm->comm_fabric == COMM_MESH) {
127 map_reset_iterate(comm->remote_node_map);
128 while ((remote = map_next(comm->remote_node_map))) {
129 /* remote->rate corresponds to the rate (GRD) or weight (FPS)
130 * in generated by the peer remote. */
131 *aggregate += remote->rate;
133 /* If we continue to read it without having heard an update,
134 * we start to make the peer's value approach decayto, getting
135 * half of the way there each time. */
136 if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
137 remote->rate += ((decayto - remote->rate) / 2);
142 *aggregate += comm->local_rate;
143 } else if (comm->comm_fabric == COMM_GOSSIP) {
145 value = (comm->gossip.value / comm->gossip.weight);
146 value *= (comm->remote_node_count + 1);
148 /* Keep around the last value so that we don't stupidly pick 0 when
149 * we're negative. If we pick 0, it looks to the limiter like it
150 * has free reign and it will take 100% of the rate allocation for
153 //*aggregate = comm->gossip.last_nonzero;
155 //printf("*****Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
158 comm->gossip.last_nonzero = *aggregate;
159 //printf("Gossip value is %.3f (%u) ((%d))\n", value, *aggregate, (int) *aggregate);
162 printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
164 pthread_mutex_unlock(&comm->lock);
167 pthread_mutex_unlock(&comm->lock);
169 //printf("read: %.3f\n", *aggregate);
174 int write_local_value(comm_t *comm, const double value) {
175 pthread_mutex_lock(&comm->lock);
176 if (comm->comm_fabric == COMM_MESH) {
177 comm->last_local_rate = comm->local_rate;
178 comm->local_rate = value;
179 comm->rate_change = comm->local_rate - comm->last_local_rate;
180 } else if (comm->comm_fabric == COMM_GOSSIP) {
181 comm->last_local_rate = comm->local_rate;
182 comm->local_rate = value;
183 comm->rate_change = comm->local_rate - comm->last_local_rate;
184 /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
185 /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
186 comm->gossip.value += comm->rate_change;
189 printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
191 pthread_mutex_unlock(&comm->lock);
194 pthread_mutex_unlock(&comm->lock);
199 int send_update(comm_t *comm, uint32_t id) {
202 pthread_mutex_lock(&comm->lock);
204 result = comm->send_function(comm, id, limiter.udp_socket);
206 pthread_mutex_unlock(&comm->lock);
211 void *limiter_receive_thread(void *unused) {
212 sigset_t signal_mask;
214 sigemptyset(&signal_mask);
215 sigaddset(&signal_mask, SIGHUP);
216 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
218 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
219 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);