1 /* See the DRL-LICENSE file for this file's software license. */
6 #define _XOPEN_SOURCE 600
12 #include <sys/types.h>
15 /* in_addr_t, in_port_t */
16 #include <arpa/inet.h>
18 /* pthread functions. */
22 #include <sys/select.h>
24 /* Hash map types/functions. */
31 #define MAX_IDENTS (1024)
32 #define MAX_LIMITERS (128)
34 #define REMOTE_AWOL_THRESHOLD (5)
36 enum transports { UDP, TCP };
38 typedef struct gossipval {
45 typedef struct out_neighbor {
52 typedef struct in_neighbor {
58 typedef struct remote_node {
63 typedef struct remote_limiter {
64 /** The last known value at the remote limiter. */
67 in_neighbor_t incoming;
68 out_neighbor_t outgoing;
70 /* Socket to contact this remote limiter, if using TCP. */
76 /** Flag to keep track of situations in which we read from this node's
77 * value more than once before receiving an update from it. We use this
78 * value to know when it's safe to begin decaying the remote node's value
79 * (because we assume that it has failed). */
85 /** Communication policy. (COMM_MESH, COMM_GOSSIP) */
86 enum commfabrics comm_fabric;
88 /** Transport protocol. */
89 enum transports transport_proto;
91 /** Current local value. */
94 /** Previous local value. */
95 double last_local_rate;
99 /** The number of remote nodes in the identity */
100 uint32_t remote_node_count;
102 remote_node_t *remote_nodes;
104 /** Array containing all known remote limiters in this identity.
105 * Contains the same information as the remote_node_map. */
106 remote_limiter_t *remote_limiters;
108 /** Hash map containing all the remote limiters in this identity.
109 * Indexed by the remote_node_t used to create the remote_limiter_t.
110 * Maps to the appropriate element of the remote_limiters array. */
111 map_handle remote_node_map;
113 /** A mutex to protect the comm structure. */
114 pthread_mutex_t lock;
116 /** Gossip values for our local identity */
119 /** Function pointer to send function. */
120 int (*send_function)(struct comm *comm, uint32_t id, int sock);
123 /** Thread for handling incoming TCP data. */
124 pthread_t tcp_recv_thread;
126 /** Descriptor set for reading TCP messages */
130 /** Array of integers specifiying which nodes, if any, have outstanding
131 * unacked data. When nodes fall in this category, and it's time to send,
132 * these nodes will be chosen first. This only affects gossip. A
133 * negative number means there is no retransmit necessary. Otherwise, the
134 * value is the index into the remote_limiters array of the necessary
140 typedef struct message {
150 typedef struct hello_message {
157 struct recv_thread_args {
159 pthread_rwlock_t *lock;
166 * Initializes the global limiter.
168 * @param ipaddr The IP address on which the limiter should listen.
169 * INADDR_ANY will suffice. Should be specified in network byte order.
171 * @param port The port on which the limiter should listen. Should be specified
172 * in network byte order.
174 void init_limiter(const in_addr_t ipaddr, const in_port_t port);
177 * Deallocates the entire global limiter.
179 void destroy_limiter();
183 * Fills in the communication structure of an identity.
185 * @param comm The communication structure to be created/populated.
187 * @param config The configuration options for the identity.
189 * @param nodes An array of remote nodes belonging to this identity.
191 * @returns 0 on success, ENOMEM if memory cannot be allocated.
193 int new_comm(comm_t *comm, ident_config *config, remote_node_t *nodes);
196 * Frees the memory associated with an identity's communication structure.
198 * @param comm The communication structure to free.
200 void free_comm(comm_t *comm);
203 * Calculates and reads the current aggregate value for an identity.
204 * This value includes the locally observed value.
206 * @param comm The comm structure of the identity in question.
208 * @param aggregate The location at which the aggregate value will
211 * @param decayto When using a mesh comm fabric, limiters whose value
212 * has not been heard in several timesteps will decay to this value.
213 * Generally globallimit/N.
215 * @returns 0 on success, EINVAL on error.
217 int read_comm(comm_t *comm, double *aggregate, double decayto);
220 * Updates the locally observed value of an identity.
222 * @param comm The comm structure of the identity to update.
224 * @param value The new locally observed value.
226 * @returns 0 on success, EINVAL on error.
228 int write_local_value(comm_t *comm, const double value);
231 * Sends the local state information to one or more peer limiters in the same
232 * identity. If the identity is configured as a mesh, it will send to all
233 * peers. If the identity is configured using gossip, it will send to the
234 * number of peers specified by the gossip_branch field of the comm_config_t
235 * that was used to configure the identity.
237 * @param comm The communication structure of the identity whose value should
240 * @param id The unique id of the identity that is sending.
242 * @returns 0 on success, ENOMEM if there was not enough memory, or possibly
243 * other E values as a result of socket-related results.
245 int send_update(comm_t *comm, uint32_t id);
248 * Thread that is responsible for receiving data from other limiters.
250 * @param limiter The limiter_t that is to be doing the receiving.
252 void *limiter_receive_thread(void *unused);
254 #endif /* _DRL_STATE_ */