ef57abe0085952a1aef12b2b2d9dadf8c91c2b74
[distributedratelimiting.git] / drl / drl_state.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /** Allows us to use pthread_rwlocks. */
4 #define _XOPEN_SOURCE 600
5
6 /* malloc(), NULL */
7 #include <stdlib.h>
8
9 /* getpid() */
10 #include <unistd.h>
11
12 /* Socket functions. */
13 #include <sys/socket.h>
14
15 /* memset() */
16 #include <string.h>
17
18 /* perror() */
19 #include <errno.h>
20
21 /* FD_ZERO() */
22 #include <sys/select.h>
23
24 #include <assert.h>
25
26 #include "raterouter.h"
27 #include "ratetypes.h"
28 #include "drl_state.h"
29 #include "peer_comm.h"
30 #include "logging.h"
31
32 extern limiter_t limiter;
33
34 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
35     int i;
36
37     memset(comm, 0, sizeof(comm_t));
38
39     comm->comm_fabric = config->commfabric;
40     comm->transport_proto = UDP;
41     comm->remote_node_count = config->peer_count;
42     comm->gossip.gossip_branch = config->branch;
43     comm->gossip.weight = 1.0;
44
45     pthread_mutex_init(&comm->lock, NULL);
46
47     /* Set send function. */
48     switch (config->commfabric) {
49         case COMM_MESH:
50             comm->send_function = send_udp_mesh;
51             break;
52         case COMM_GOSSIP:
53             comm->send_function = send_udp_gossip;
54             break;
55     }
56
57     comm->remote_node_map = allocate_map();
58     if (comm->remote_node_map == NULL) {
59         pthread_mutex_destroy(&comm->lock);
60         return ENOMEM;
61     }
62
63     /* Allocate remote_limiters array and fill it in. Add remotes to map. */
64     comm->remote_limiters =
65                         malloc(config->peer_count * sizeof(remote_limiter_t));
66
67     if (comm->remote_limiters == NULL) {
68         pthread_mutex_destroy(&comm->lock);
69         free_map(comm->remote_node_map, 0);
70         return ENOMEM;
71     }
72
73     memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
74
75     for (i = 0; i < config->peer_count; ++i) {
76         comm->remote_limiters[i].addr = remote_nodes[i].addr;
77         comm->remote_limiters[i].port = remote_nodes[i].port;
78         comm->remote_limiters[i].outgoing.next_seqno = 1;
79         map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
80                    sizeof(remote_node_t), &comm->remote_limiters[i]);
81     }
82
83     /* Allocate and initialize retrys. */
84     comm->retrys = malloc(config->branch * sizeof(int));
85     if (comm->retrys == NULL) {
86         pthread_mutex_destroy(&comm->lock);
87         free_map(comm->remote_node_map, 0);
88         free(comm->remote_limiters);
89         return ENOMEM;
90     }
91
92     for (i = 0; i < config->branch; ++i) {
93         comm->retrys[i] = -1;
94     }
95
96     return 0;
97 }
98
99 void free_comm(comm_t *comm) {
100     if (comm) {
101         if (comm->remote_limiters) {
102             free(comm->remote_limiters);
103         }
104
105         if (comm->remote_nodes) {
106             free(comm->remote_nodes);
107         }
108
109         if (comm->remote_node_map) {
110             free_map(comm->remote_node_map, 0);
111         }
112
113         pthread_mutex_destroy(&comm->lock);
114
115         if (comm->retrys) {
116             free(comm->retrys);
117         }
118     }
119 }
120
121 int read_comm(comm_t *comm, double *aggregate, double decayto) {
122     remote_limiter_t *remote;
123
124     pthread_mutex_lock(&comm->lock);
125     if (comm->comm_fabric == COMM_MESH) {
126         *aggregate = 0;
127         map_reset_iterate(comm->remote_node_map);
128         while ((remote = map_next(comm->remote_node_map))) {
129             /* remote->rate corresponds to the rate (GRD) or weight (FPS)
130              * in generated by the peer remote. */
131             *aggregate += remote->rate;
132
133             /* If we continue to read it without having heard an update,
134              * we start to make the peer's value approach decayto, getting
135              * half of the way there each time. */
136             if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) {
137                 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
138                 remote->rate += ((decayto - remote->rate) / 2);
139             } else {
140                 remote->awol++;
141             }
142         }
143         *aggregate += comm->local_rate;
144     } else if (comm->comm_fabric == COMM_GOSSIP) {
145         int i;
146         int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
147         double value = 0;
148         value = (comm->gossip.value / comm->gossip.weight);
149         value *= (comm->remote_node_count + 1);
150
151         /* Keep around the last value so that we don't stupidly pick 0 when
152          * we're negative.  If we pick 0, it looks to the limiter like it
153          * has free reign and it will take 100% of the rate allocation for
154          * itself. This is a lie.  Open question what to do here... FIXME: Use decayto?*/
155         if (value <= 0) {
156             //*aggregate = comm->gossip.last_nonzero;
157             *aggregate = 0;
158             printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n");
159         } else {
160             *aggregate = value;
161             comm->gossip.last_nonzero = *aggregate;
162             printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value);
163         }
164
165         for (i = 0; i < comm->remote_node_count; ++i) {
166             if (comm->remote_limiters[i].awol == threshold) {
167                 /* Re-claim any value/weight sent. */
168                 comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value;
169                 comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight;
170
171                 comm->remote_limiters[i].outgoing.saved_value = 0.0;
172                 comm->remote_limiters[i].outgoing.saved_weight = 0.0;
173
174                 comm->remote_limiters[i].awol += 1;
175             } else if (comm->remote_limiters[i].awol < threshold) {
176                 comm->remote_limiters[i].awol += 1;
177             }
178         }
179     } else {
180         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
181                  comm->comm_fabric);
182         pthread_mutex_unlock(&comm->lock);
183         return EINVAL;
184     }
185     pthread_mutex_unlock(&comm->lock);
186
187     return 0;
188 }
189
190 int write_local_value(comm_t *comm, const double value) {
191     pthread_mutex_lock(&comm->lock);
192     if (comm->comm_fabric == COMM_MESH) {
193         comm->last_local_rate = comm->local_rate;
194         comm->local_rate = value;
195         comm->rate_change = comm->local_rate - comm->last_local_rate;
196     } else if (comm->comm_fabric == COMM_GOSSIP) {
197         comm->last_local_rate = comm->local_rate;
198         comm->local_rate = value;
199         comm->rate_change = comm->local_rate - comm->last_local_rate;
200         /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
201         /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
202         comm->gossip.value += comm->rate_change;
203     }
204     else {
205         printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
206                  comm->comm_fabric);
207         pthread_mutex_unlock(&comm->lock);
208         return EINVAL;
209     }
210     pthread_mutex_unlock(&comm->lock);
211
212     return 0;
213 }
214
215 int send_update(comm_t *comm, uint32_t id) {
216     int result = 0;
217
218     pthread_mutex_lock(&comm->lock);
219
220     result = comm->send_function(comm, id, limiter.udp_socket);
221
222     pthread_mutex_unlock(&comm->lock);
223
224     return result;
225 }
226
227 void *limiter_receive_thread(void *unused) {
228     sigset_t signal_mask;
229
230     sigemptyset(&signal_mask);
231     sigaddset(&signal_mask, SIGHUP);
232     sigaddset(&signal_mask, SIGUSR1);
233     sigaddset(&signal_mask, SIGUSR2);
234     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
235
236     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
237     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
238     while (1) {
239         limiter_receive();
240     }
241     pthread_exit(NULL);
242 }