Importing all of DRL, including ulogd and all of its files.
[distributedratelimiting.git] / drl / drl_state.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /** Allows us to use pthread_rwlocks. */
4 #define _XOPEN_SOURCE 600
5
6 /* malloc(), NULL */
7 #include <stdlib.h>
8
9 /* getpid() */
10 #include <unistd.h>
11
12 /* Socket functions. */
13 #include <sys/socket.h>
14
15 /* memset() */
16 #include <string.h>
17
18 /* perror() */
19 #include <errno.h>
20
21 /* FD_ZERO() */
22 #include <sys/select.h>
23
24 #include <assert.h>
25
26 #include "raterouter.h"
27 #include "ratetypes.h"
28 #include "drl_state.h"
29 #include "peer_comm.h"
30 #include "logging.h"
31
32 extern limiter_t limiter;
33
34 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
35     int i;
36
37     memset(comm, 0, sizeof(comm_t));
38
39     comm->comm_fabric = config->commfabric;
40     comm->transport_proto = UDP;
41     comm->remote_node_count = config->peer_count;
42     comm->gossip.gossip_branch = config->branch;
43     comm->gossip.weight = 1.0;
44
45     pthread_mutex_init(&comm->lock, NULL);
46
47     /* Set send function. */
48     switch (config->commfabric) {
49         case COMM_MESH:
50             comm->send_function = send_udp_mesh;
51             break;
52         case COMM_GOSSIP:
53             comm->send_function = send_udp_gossip;
54             break;
55     }
56
57     comm->remote_node_map = allocate_map();
58     if (comm->remote_node_map == NULL) {
59         pthread_mutex_destroy(&comm->lock);
60         return ENOMEM;
61     }
62
63     /* Allocate remote_limiters array and fill it in. Add remotes to map. */
64     comm->remote_limiters =
65                         malloc(config->peer_count * sizeof(remote_limiter_t));
66
67     if (comm->remote_limiters == NULL) {
68         pthread_mutex_destroy(&comm->lock);
69         free_map(comm->remote_node_map, 0);
70         return ENOMEM;
71     }
72
73     memset(comm->remote_limiters, 0, config->peer_count * sizeof(remote_limiter_t));
74
75     for (i = 0; i < config->peer_count; ++i) {
76         comm->remote_limiters[i].addr = remote_nodes[i].addr;
77         comm->remote_limiters[i].port = remote_nodes[i].port;
78         comm->remote_limiters[i].outgoing.next_seqno = 1;
79         map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
80                    sizeof(remote_node_t), &comm->remote_limiters[i]);
81     }
82
83     /* Allocate and initialize retrys. */
84     comm->retrys = malloc(config->branch * sizeof(int));
85     if (comm->retrys == NULL) {
86         pthread_mutex_destroy(&comm->lock);
87         free_map(comm->remote_node_map, 0);
88         free(comm->remote_limiters);
89         return ENOMEM;
90     }
91
92     for (i = 0; i < config->branch; ++i) {
93         comm->retrys[i] = -1;
94     }
95
96     return 0;
97 }
98
99 void free_comm(comm_t *comm) {
100     if (comm) {
101         if (comm->remote_limiters) {
102             free(comm->remote_limiters);
103         }
104
105         if (comm->remote_nodes) {
106             free(comm->remote_nodes);
107         }
108
109         if (comm->remote_node_map) {
110             free_map(comm->remote_node_map, 0);
111         }
112
113         pthread_mutex_destroy(&comm->lock);
114
115         if (comm->retrys) {
116             free(comm->retrys);
117         }
118     }
119 }
120
121 int read_comm(comm_t *comm, double *aggregate) {
122     remote_limiter_t *remote;
123
124     pthread_mutex_lock(&comm->lock);
125     if (comm->comm_fabric == COMM_MESH) {
126         *aggregate = 0;
127         map_reset_iterate(comm->remote_node_map);
128         while ((remote = map_next(comm->remote_node_map))) {
129             /* remote->rate corresponds to the rate (GRD) or weight (FPS)
130              * in generated by the peer remote. */
131             *aggregate += remote->rate;
132
133             /* If we continue to read it without having heard an update,
134              * we start to decay its value. */
135             if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
136                 remote->rate = remote->rate / 2;
137             } else {
138                 remote->awol++;
139             }
140         }
141         *aggregate += comm->local_rate;
142     } else if (comm->comm_fabric == COMM_GOSSIP) {
143         double value = 0;
144         value = (comm->gossip.value / comm->gossip.weight);
145         value *= (comm->remote_node_count + 1);
146
147         /* Keep around the last value so that we don't stupidly pick 0 when
148          * we're negative.  If we pick 0, it looks to the limiter like it
149          * has free reign and it will take 100% of the rate allocation for
150          * itself. */
151         if (value <= 0) {
152             //*aggregate = comm->gossip.last_nonzero;
153             *aggregate = 0;
154             //printf("*****Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
155         } else {
156             *aggregate = value;
157             comm->gossip.last_nonzero = *aggregate;
158             //printf("Gossip value is %.3f  (%u)  ((%d))\n", value, *aggregate, (int) *aggregate);
159         }
160     } else {
161         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
162                  comm->comm_fabric);
163         pthread_mutex_unlock(&comm->lock);
164         return EINVAL;
165     }
166     pthread_mutex_unlock(&comm->lock);
167
168     //printf("read: %.3f\n", *aggregate);
169
170     return 0;
171 }
172
173 int write_local_value(comm_t *comm, const double value) {
174     pthread_mutex_lock(&comm->lock);
175     if (comm->comm_fabric == COMM_MESH) {
176         comm->last_local_rate = comm->local_rate;
177         comm->local_rate = value;
178         comm->rate_change = comm->local_rate - comm->last_local_rate;
179     } else if (comm->comm_fabric == COMM_GOSSIP) {
180         comm->last_local_rate = comm->local_rate;
181         comm->local_rate = value;
182         comm->rate_change = comm->local_rate - comm->last_local_rate;
183         /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
184         /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
185         comm->gossip.value += comm->rate_change;
186     }
187     else {
188         printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
189                  comm->comm_fabric);
190         pthread_mutex_unlock(&comm->lock);
191         return EINVAL;
192     }
193     pthread_mutex_unlock(&comm->lock);
194
195     return 0;
196 }
197
198 int send_update(comm_t *comm, uint32_t id) {
199     int result = 0;
200
201     pthread_mutex_lock(&comm->lock);
202
203     result = comm->send_function(comm, id, limiter.udp_socket);
204
205     pthread_mutex_unlock(&comm->lock);
206
207     return result;
208 }
209
210 void *limiter_receive_thread(void *unused) {
211     sigset_t signal_mask;
212
213     sigemptyset(&signal_mask);
214     sigaddset(&signal_mask, SIGHUP);
215     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
216
217     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
218     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
219     while (1) {
220         limiter_receive();
221     }
222     pthread_exit(NULL);
223 }