Added an 'independent' option for set identites that will put them under the
[distributedratelimiting.git] / drl / drl_state.h
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #ifndef _DRL_STATE_
4 #define _DRL_STATE_
5
6 #define _XOPEN_SOURCE 600
7
8 /* FILE */
9 #include <stdio.h>
10
11 /* uint32_t */
12 #include <sys/types.h>
13 #include <inttypes.h>
14
15 /* in_addr_t, in_port_t */
16 #include <arpa/inet.h>
17
18 /* pthread functions. */
19 #include <pthread.h>
20
21 /* fd_set */
22 #include <sys/select.h>
23
24 /* Hash map types/functions. */
25 #ifdef STANDALONE
26 #include "../util.h"
27 #else
28 #include "util.h"
29 #endif
30
31 #define MAX_IDENTS (1024)
32 #define MAX_LIMITERS (128)
33
34 #define MESH_REMOTE_AWOL_THRESHOLD (5)
35
36 //FIXME: Make this more scientific?
37 #define GOSSIP_REMOTE_AWOL_THRESHOLD (10 * comm->remote_node_count / comm->gossip.gossip_branch)
38
39 enum transports { UDP, TCP };
40
41 typedef struct gossipval {
42     int gossip_branch;
43     double last_nonzero;
44     double value;
45     double weight;
46 } gossip_t;
47
48 typedef struct out_neighbor {
49     uint32_t next_seqno;
50     uint32_t first_seqno;
51     double saved_value;
52     double saved_weight;
53 } out_neighbor_t;
54
55 typedef struct in_neighbor {
56     uint32_t seen_seqno;
57     double saved_value;
58     double saved_weight;
59 } in_neighbor_t;
60
61 typedef struct remote_node {
62     in_addr_t addr;
63     in_port_t port;
64 } remote_node_t;
65
66 typedef struct remote_limiter {
67     /** The last known value at the remote limiter. */
68     double rate;
69
70     in_neighbor_t incoming;
71     out_neighbor_t outgoing;
72
73     /* Socket to contact this remote limiter, if using TCP. */
74     int socket;
75
76     in_addr_t addr;
77     in_port_t port;
78
79     /** Flag to keep track of situations in which we read from this node's
80      * value more than once before receiving an update from it.  We use this
81      * value to know when it's safe to begin decaying the remote node's value
82      * (because we assume that it has failed). */
83     int awol;
84
85 } remote_limiter_t;
86
87 typedef struct comm {
88     /** Communication policy. (COMM_MESH, COMM_GOSSIP) */
89     enum commfabrics comm_fabric;
90
91     /** Transport protocol. */
92     enum transports transport_proto;
93
94     /** Current local value. */
95     double local_rate;
96
97     /** Previous local value. */
98     double last_local_rate;
99
100     double rate_change;
101
102     /** The number of remote nodes in the identity */
103     uint32_t remote_node_count;
104
105     remote_node_t *remote_nodes;
106
107     /** Array containing all known remote limiters in this identity.
108      * Contains the same information as the remote_node_map. */
109     remote_limiter_t *remote_limiters;
110
111     /** Hash map containing all the remote limiters in this identity.
112      * Indexed by the remote_node_t used to create the remote_limiter_t.
113      * Maps to the appropriate element of the remote_limiters array. */
114     map_handle remote_node_map;
115
116     /** A mutex to protect the comm structure. */
117     pthread_mutex_t lock;
118
119     /** Gossip values for our local identity */
120     gossip_t gossip;
121
122     /** Function pointer to send function. */
123     int (*send_function)(struct comm *comm, uint32_t id, int sock);
124
125 #if 0
126     /** Thread for handling incoming TCP data. */
127     pthread_t tcp_recv_thread;
128
129     /** Descriptor set for reading TCP messages */
130     fd_set fds;
131 #endif
132
133     /** Array of integers specifiying which nodes, if any, have outstanding
134      * unacked data.  When nodes fall in this category, and it's time to send,
135      * these nodes will be chosen first.  This only affects gossip.  A
136      * negative number means there is no retransmit necessary.  Otherwise, the
137      * value is the index into the remote_limiters array of the necessary
138      * retransmit. */
139     int *retrys;
140
141 } comm_t;
142
143 typedef struct message {
144     uint32_t magic;
145     uint32_t ident_id;
146     uint32_t seqno;
147     uint32_t min_seqno;
148     double value;
149     double weight;
150     uint16_t type;
151 } message_t;
152
153 typedef struct hello_message {
154     uint32_t magic;
155     uint32_t ident_id;
156     uint16_t port;
157 } hello_t;
158
159 #if 0
160 struct recv_thread_args {
161     comm_ident_t *ident;
162     pthread_rwlock_t *lock;
163     uint16_t port;
164 };
165 #endif
166
167 #if 0
168 /**
169  * Initializes the global limiter.
170  *
171  * @param ipaddr The IP address on which the limiter should listen.
172  * INADDR_ANY will suffice.  Should be specified in network byte order.
173  *
174  * @param port The port on which the limiter should listen. Should be specified
175  * in network byte order.
176  */
177 void init_limiter(const in_addr_t ipaddr, const in_port_t port);
178
179 /**
180  * Deallocates the entire global limiter.
181  */
182 void destroy_limiter();
183 #endif
184
185 /**
186  * Fills in the communication structure of an identity.
187  *
188  * @param comm The communication structure to be created/populated.
189  *
190  * @param config The configuration options for the identity.
191  *
192  * @param nodes An array of remote nodes belonging to this identity.
193  *
194  * @returns 0 on success, ENOMEM if memory cannot be allocated.
195  */
196 int new_comm(comm_t *comm, ident_config *config, remote_node_t *nodes);
197
198 /**
199  * Frees the memory associated with an identity's communication structure.
200  *
201  * @param comm The communication structure to free.
202  */
203 void free_comm(comm_t *comm);
204
205 /**
206  * Calculates and reads the current aggregate value for an identity.
207  * This value includes the locally observed value.
208  *
209  * @param comm The comm structure of the identity in question.
210  *
211  * @param aggregate The location at which the aggregate value will
212  * be stored.
213  *
214  * @param decayto When using a mesh comm fabric, limiters whose value
215  * has not been heard in several timesteps will decay to this value.
216  * Generally globallimit/N.
217  *
218  * @returns 0 on success, EINVAL on error.
219  */
220 int read_comm(comm_t *comm, double *aggregate, double decayto);
221
222 /**
223  * Updates the locally observed value of an identity.
224  *
225  * @param comm The comm structure of the identity to update.
226  *
227  * @param value The new locally observed value.
228  *
229  * @returns 0 on success, EINVAL on error.
230  */
231 int write_local_value(comm_t *comm, const double value);
232
233 /**
234  * Sends the local state information to one or more peer limiters in the same
235  * identity.  If the identity is configured as a mesh, it will send to all
236  * peers.  If the identity is configured using gossip, it will send to the
237  * number of peers specified by the gossip_branch field of the comm_config_t
238  * that was used to configure the identity.
239  *
240  * @param comm The communication structure of the identity whose value should
241  * be propagated.
242  *
243  * @param id The unique id of the identity that is sending.
244  *
245  * @returns 0 on success, ENOMEM if there was not enough memory, or possibly
246  * other E values as a result of socket-related results.
247  */
248 int send_update(comm_t *comm, uint32_t id);
249
250 /**
251  * Thread that is responsible for receiving data from other limiters.
252  *
253  * @param limiter The limiter_t that is to be doing the receiving.
254  */
255 void *limiter_receive_thread(void *unused);
256
257 #endif  /* _DRL_STATE_ */