Reincarnated GRD. Changed mesh decay to go to 1/N rather than 0.
[distributedratelimiting.git] / drl / drl_state.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /** Allows us to use pthread_rwlocks. */
4 #define _XOPEN_SOURCE 600
5
6 /* malloc(), NULL */
7 #include <stdlib.h>
8
9 /* getpid() */
10 #include <unistd.h>
11
12 /* Socket functions. */
13 #include <sys/socket.h>
14
15 /* memset() */
16 #include <string.h>
17
18 /* perror() */
19 #include <errno.h>
20
21 /* FD_ZERO() */
22 #include <sys/select.h>
23
24 #include <assert.h>
25
26 #include "raterouter.h"
27 #include "ratetypes.h"
28 #include "drl_state.h"
29 #include "peer_comm.h"
30 #include "logging.h"
31
32 extern limiter_t limiter;
33
34 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
35     int i;
36
37     memset(comm, 0, sizeof(comm_t));
38
39     comm->comm_fabric = config->commfabric;
40     comm->transport_proto = UDP;
41     comm->remote_node_count = config->peer_count;
42     comm->gossip.gossip_branch = config->branch;
43     comm->gossip.weight = 1.0;
44
45     pthread_mutex_init(&comm->lock, NULL);
46
47     /* Set send function. */
48     switch (config->commfabric) {
49         case COMM_MESH:
50             comm->send_function = send_udp_mesh;
51             break;
52         case COMM_GOSSIP:
53             comm->send_function = send_udp_gossip;
54             break;
55     }
56
57     comm->remote_node_map = allocate_map();
58     if (comm->remote_node_map == NULL) {
59         pthread_mutex_destroy(&comm->lock);
60         return ENOMEM;
61     }
62
63     /* Allocate remote_limiters array and fill it in. Add remotes to map. */
64     comm->remote_limiters =
65                         malloc(config->peer_count * sizeof(remote_limiter_t));
66
67     if (comm->remote_limiters == NULL) {
68         pthread_mutex_destroy(&comm->lock);
69         free_map(comm->remote_node_map, 0);
70         return ENOMEM;
71     }
72
73     memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
74
75     for (i = 0; i < config->peer_count; ++i) {
76         comm->remote_limiters[i].addr = remote_nodes[i].addr;
77         comm->remote_limiters[i].port = remote_nodes[i].port;
78         comm->remote_limiters[i].outgoing.next_seqno = 1;
79         map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
80                    sizeof(remote_node_t), &comm->remote_limiters[i]);
81     }
82
83     /* Allocate and initialize retrys. */
84     comm->retrys = malloc(config->branch * sizeof(int));
85     if (comm->retrys == NULL) {
86         pthread_mutex_destroy(&comm->lock);
87         free_map(comm->remote_node_map, 0);
88         free(comm->remote_limiters);
89         return ENOMEM;
90     }
91
92     for (i = 0; i < config->branch; ++i) {
93         comm->retrys[i] = -1;
94     }
95
96     return 0;
97 }
98
99 void free_comm(comm_t *comm) {
100     if (comm) {
101         if (comm->remote_limiters) {
102             free(comm->remote_limiters);
103         }
104
105         if (comm->remote_nodes) {
106             free(comm->remote_nodes);
107         }
108
109         if (comm->remote_node_map) {
110             free_map(comm->remote_node_map, 0);
111         }
112
113         pthread_mutex_destroy(&comm->lock);
114
115         if (comm->retrys) {
116             free(comm->retrys);
117         }
118     }
119 }
120
121 int read_comm(comm_t *comm, double *aggregate, double decayto) {
122     remote_limiter_t *remote;
123
124     pthread_mutex_lock(&comm->lock);
125     if (comm->comm_fabric == COMM_MESH) {
126         *aggregate = 0;
127         map_reset_iterate(comm->remote_node_map);
128         while ((remote = map_next(comm->remote_node_map))) {
129             /* remote->rate corresponds to the rate (GRD) or weight (FPS)
130              * in generated by the peer remote. */
131             *aggregate += remote->rate;
132
133             /* If we continue to read it without having heard an update,
134              * we start to make the peer's value approach decayto, getting
135              * half of the way there each time. */
136             if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
137                 remote->rate += ((decayto - remote->rate) / 2);
138             } else {
139                 remote->awol++;
140             }
141         }
142         *aggregate += comm->local_rate;
143     } else if (comm->comm_fabric == COMM_GOSSIP) {
144         double value = 0;
145         value = (comm->gossip.value / comm->gossip.weight);
146         value *= (comm->remote_node_count + 1);
147
148         /* Keep around the last value so that we don't stupidly pick 0 when
149          * we're negative.  If we pick 0, it looks to the limiter like it
150          * has free reign and it will take 100% of the rate allocation for
151          * itself. */
152         if (value <= 0) {
153             //*aggregate = comm->gossip.last_nonzero;
154             *aggregate = 0;
155             //printf("*****Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
156         } else {
157             *aggregate = value;
158             comm->gossip.last_nonzero = *aggregate;
159             //printf("Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
160         }
161     } else {
162         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
163                  comm->comm_fabric);
164         pthread_mutex_unlock(&comm->lock);
165         return EINVAL;
166     }
167     pthread_mutex_unlock(&comm->lock);
168
169     //printf("read: %.3f\n", *aggregate);
170
171     return 0;
172 }
173
174 int write_local_value(comm_t *comm, const double value) {
175     pthread_mutex_lock(&comm->lock);
176     if (comm->comm_fabric == COMM_MESH) {
177         comm->last_local_rate = comm->local_rate;
178         comm->local_rate = value;
179         comm->rate_change = comm->local_rate - comm->last_local_rate;
180     } else if (comm->comm_fabric == COMM_GOSSIP) {
181         comm->last_local_rate = comm->local_rate;
182         comm->local_rate = value;
183         comm->rate_change = comm->local_rate - comm->last_local_rate;
184         /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
185         /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
186         comm->gossip.value += comm->rate_change;
187     }
188     else {
189         printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
190                  comm->comm_fabric);
191         pthread_mutex_unlock(&comm->lock);
192         return EINVAL;
193     }
194     pthread_mutex_unlock(&comm->lock);
195
196     return 0;
197 }
198
199 int send_update(comm_t *comm, uint32_t id) {
200     int result = 0;
201
202     pthread_mutex_lock(&comm->lock);
203
204     result = comm->send_function(comm, id, limiter.udp_socket);
205
206     pthread_mutex_unlock(&comm->lock);
207
208     return result;
209 }
210
211 void *limiter_receive_thread(void *unused) {
212     sigset_t signal_mask;
213
214     sigemptyset(&signal_mask);
215     sigaddset(&signal_mask, SIGHUP);
216     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
217
218     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
219     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
220     while (1) {
221         limiter_receive();
222     }
223     pthread_exit(NULL);
224 }