ea68f511e2b11684be17bc4f8e1a707acbce5167
[distributedratelimiting.git] / drl / drl_state.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /** Allows us to use pthread_rwlocks. */
4 #define _XOPEN_SOURCE 600
5
6 /* malloc(), NULL */
7 #include <stdlib.h>
8
9 /* getpid() */
10 #include <unistd.h>
11
12 /* Socket functions. */
13 #include <sys/socket.h>
14
15 /* memset() */
16 #include <string.h>
17
18 /* perror() */
19 #include <errno.h>
20
21 /* FD_ZERO() */
22 #include <sys/select.h>
23
24 #include <assert.h>
25
26 #include "raterouter.h"
27 #include "ratetypes.h"
28 #include "drl_state.h"
29 #include "peer_comm.h"
30 #include "logging.h"
31
32 extern limiter_t limiter;
33
34 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
35     int i;
36
37     memset(comm, 0, sizeof(comm_t));
38
39     comm->comm_fabric = config->commfabric;
40     comm->transport_proto = UDP;
41     comm->remote_node_count = config->peer_count;
42     comm->gossip.gossip_branch = config->branch;
43     comm->gossip.weight = 1.0;
44
45     pthread_mutex_init(&comm->lock, NULL);
46
47     /* Set send function. */
48     switch (config->commfabric) {
49         case COMM_MESH:
50             comm->send_function = send_udp_mesh;
51             break;
52         case COMM_GOSSIP:
53             comm->send_function = send_udp_gossip;
54             break;
55     }
56
57     comm->remote_node_map = allocate_map();
58     if (comm->remote_node_map == NULL) {
59         pthread_mutex_destroy(&comm->lock);
60         return ENOMEM;
61     }
62
63     /* Allocate remote_limiters array and fill it in. Add remotes to map. */
64     comm->remote_limiters =
65                         malloc(config->peer_count * sizeof(remote_limiter_t));
66
67     if (comm->remote_limiters == NULL) {
68         pthread_mutex_destroy(&comm->lock);
69         free_map(comm->remote_node_map, 0);
70         return ENOMEM;
71     }
72
73     memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
74
75     for (i = 0; i < config->peer_count; ++i) {
76         comm->remote_limiters[i].addr = remote_nodes[i].addr;
77         comm->remote_limiters[i].port = remote_nodes[i].port;
78         comm->remote_limiters[i].outgoing.next_seqno = 1;
79         map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
80                    sizeof(remote_node_t), &comm->remote_limiters[i]);
81     }
82
83     /* Allocate and initialize retrys. */
84     comm->retrys = malloc(config->branch * sizeof(int));
85     if (comm->retrys == NULL) {
86         pthread_mutex_destroy(&comm->lock);
87         free_map(comm->remote_node_map, 0);
88         free(comm->remote_limiters);
89         return ENOMEM;
90     }
91
92     for (i = 0; i < config->branch; ++i) {
93         comm->retrys[i] = -1;
94     }
95
96     return 0;
97 }
98
99 void free_comm(comm_t *comm) {
100     if (comm) {
101         if (comm->remote_limiters) {
102             free(comm->remote_limiters);
103         }
104
105         if (comm->remote_nodes) {
106             free(comm->remote_nodes);
107         }
108
109         if (comm->remote_node_map) {
110             free_map(comm->remote_node_map, 0);
111         }
112
113         pthread_mutex_destroy(&comm->lock);
114
115         if (comm->retrys) {
116             free(comm->retrys);
117         }
118     }
119 }
120
121 int read_comm(comm_t *comm, double *aggregate, double decayto) {
122     remote_limiter_t *remote;
123
124     pthread_mutex_lock(&comm->lock);
125     if (comm->comm_fabric == COMM_MESH) {
126         *aggregate = 0;
127         map_reset_iterate(comm->remote_node_map);
128         while ((remote = map_next(comm->remote_node_map))) {
129             /* remote->rate corresponds to the rate (GRD) or weight (FPS)
130              * in generated by the peer remote. */
131             *aggregate += remote->rate;
132
133             /* If we continue to read it without having heard an update,
134              * we start to make the peer's value approach decayto, getting
135              * half of the way there each time. */
136             if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
137                 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
138                 remote->rate += ((decayto - remote->rate) / 2);
139             } else {
140                 remote->awol++;
141             }
142         }
143         *aggregate += comm->local_rate;
144     } else if (comm->comm_fabric == COMM_GOSSIP) {
145         double value = 0;
146         value = (comm->gossip.value / comm->gossip.weight);
147         value *= (comm->remote_node_count + 1);
148
149         /* Keep around the last value so that we don't stupidly pick 0 when
150          * we're negative.  If we pick 0, it looks to the limiter like it
151          * has free reign and it will take 100% of the rate allocation for
152          * itself. */
153         if (value <= 0) {
154             //*aggregate = comm->gossip.last_nonzero;
155             *aggregate = 0;
156             //printf("*****Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
157         } else {
158             *aggregate = value;
159             comm->gossip.last_nonzero = *aggregate;
160             //printf("Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
161         }
162     } else {
163         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
164                  comm->comm_fabric);
165         pthread_mutex_unlock(&comm->lock);
166         return EINVAL;
167     }
168     pthread_mutex_unlock(&comm->lock);
169
170     //printf("read: %.3f\n", *aggregate);
171
172     return 0;
173 }
174
175 int write_local_value(comm_t *comm, const double value) {
176     pthread_mutex_lock(&comm->lock);
177     if (comm->comm_fabric == COMM_MESH) {
178         comm->last_local_rate = comm->local_rate;
179         comm->local_rate = value;
180         comm->rate_change = comm->local_rate - comm->last_local_rate;
181     } else if (comm->comm_fabric == COMM_GOSSIP) {
182         comm->last_local_rate = comm->local_rate;
183         comm->local_rate = value;
184         comm->rate_change = comm->local_rate - comm->last_local_rate;
185         /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
186         /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
187         comm->gossip.value += comm->rate_change;
188     }
189     else {
190         printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
191                  comm->comm_fabric);
192         pthread_mutex_unlock(&comm->lock);
193         return EINVAL;
194     }
195     pthread_mutex_unlock(&comm->lock);
196
197     return 0;
198 }
199
200 int send_update(comm_t *comm, uint32_t id) {
201     int result = 0;
202
203     pthread_mutex_lock(&comm->lock);
204
205     result = comm->send_function(comm, id, limiter.udp_socket);
206
207     pthread_mutex_unlock(&comm->lock);
208
209     return result;
210 }
211
212 void *limiter_receive_thread(void *unused) {
213     sigset_t signal_mask;
214
215     sigemptyset(&signal_mask);
216     sigaddset(&signal_mask, SIGHUP);
217     sigaddset(&signal_mask, SIGUSR1);
218     sigaddset(&signal_mask, SIGUSR2);
219     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
220
221     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
222     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
223     while (1) {
224         limiter_receive();
225     }
226     pthread_exit(NULL);
227 }