1 /* See the DRL-LICENSE file for this file's software license. */
6 #define _XOPEN_SOURCE 600
12 #include <sys/types.h>
15 /* in_addr_t, in_port_t */
16 #include <arpa/inet.h>
18 /* pthread functions. */
22 #include <sys/select.h>
24 /* Hash map types/functions. */
31 #define MAX_IDENTS (1024)
32 #define MAX_LIMITERS (128)
34 #define MESH_REMOTE_AWOL_THRESHOLD (5)
36 //FIXME: Make this more scientific?
37 #define GOSSIP_REMOTE_AWOL_THRESHOLD (10 * comm->remote_node_count / comm->gossip.gossip_branch)
39 enum transports { UDP, TCP };
41 typedef struct gossipval {
48 typedef struct out_neighbor {
55 typedef struct in_neighbor {
61 typedef struct remote_node {
66 typedef struct remote_limiter {
67 /** The last known value at the remote limiter. */
70 in_neighbor_t incoming;
71 out_neighbor_t outgoing;
73 /* Socket to contact this remote limiter, if using TCP. */
79 /** Flag to keep track of situations in which we read from this node's
80 * value more than once before receiving an update from it. We use this
81 * value to know when it's safe to begin decaying the remote node's value
82 * (because we assume that it has failed). */
88 /** Communication policy. (COMM_MESH, COMM_GOSSIP) */
89 enum commfabrics comm_fabric;
91 /** Transport protocol. */
92 enum transports transport_proto;
94 /** Current local value. */
97 /** Previous local value. */
98 double last_local_rate;
102 /** The number of remote nodes in the identity */
103 uint32_t remote_node_count;
105 remote_node_t *remote_nodes;
107 /** Array containing all known remote limiters in this identity.
108 * Contains the same information as the remote_node_map. */
109 remote_limiter_t *remote_limiters;
111 /** Hash map containing all the remote limiters in this identity.
112 * Indexed by the remote_node_t used to create the remote_limiter_t.
113 * Maps to the appropriate element of the remote_limiters array. */
114 map_handle remote_node_map;
116 /** A mutex to protect the comm structure. */
117 pthread_mutex_t lock;
119 /** Gossip values for our local identity */
122 /** Function pointer to send function. */
123 int (*send_function)(struct comm *comm, uint32_t id, int sock);
126 /** Thread for handling incoming TCP data. */
127 pthread_t tcp_recv_thread;
129 /** Descriptor set for reading TCP messages */
133 /** Array of integers specifiying which nodes, if any, have outstanding
134 * unacked data. When nodes fall in this category, and it's time to send,
135 * these nodes will be chosen first. This only affects gossip. A
136 * negative number means there is no retransmit necessary. Otherwise, the
137 * value is the index into the remote_limiters array of the necessary
143 typedef struct message {
153 typedef struct hello_message {
160 struct recv_thread_args {
162 pthread_rwlock_t *lock;
169 * Initializes the global limiter.
171 * @param ipaddr The IP address on which the limiter should listen.
172 * INADDR_ANY will suffice. Should be specified in network byte order.
174 * @param port The port on which the limiter should listen. Should be specified
175 * in network byte order.
177 void init_limiter(const in_addr_t ipaddr, const in_port_t port);
180 * Deallocates the entire global limiter.
182 void destroy_limiter();
186 * Fills in the communication structure of an identity.
188 * @param comm The communication structure to be created/populated.
190 * @param config The configuration options for the identity.
192 * @param nodes An array of remote nodes belonging to this identity.
194 * @returns 0 on success, ENOMEM if memory cannot be allocated.
196 int new_comm(comm_t *comm, ident_config *config, remote_node_t *nodes);
199 * Frees the memory associated with an identity's communication structure.
201 * @param comm The communication structure to free.
203 void free_comm(comm_t *comm);
206 * Calculates and reads the current aggregate value for an identity.
207 * This value includes the locally observed value.
209 * @param comm The comm structure of the identity in question.
211 * @param aggregate The location at which the aggregate value will
214 * @param decayto When using a mesh comm fabric, limiters whose value
215 * has not been heard in several timesteps will decay to this value.
216 * Generally globallimit/N.
218 * @returns 0 on success, EINVAL on error.
220 int read_comm(comm_t *comm, double *aggregate, double decayto);
223 * Updates the locally observed value of an identity.
225 * @param comm The comm structure of the identity to update.
227 * @param value The new locally observed value.
229 * @returns 0 on success, EINVAL on error.
231 int write_local_value(comm_t *comm, const double value);
234 * Sends the local state information to one or more peer limiters in the same
235 * identity. If the identity is configured as a mesh, it will send to all
236 * peers. If the identity is configured using gossip, it will send to the
237 * number of peers specified by the gossip_branch field of the comm_config_t
238 * that was used to configure the identity.
240 * @param comm The communication structure of the identity whose value should
243 * @param id The unique id of the identity that is sending.
245 * @returns 0 on success, ENOMEM if there was not enough memory, or possibly
246 * other E values as a result of socket-related results.
248 int send_update(comm_t *comm, uint32_t id);
251 * Thread that is responsible for receiving data from other limiters.
253 * @param limiter The limiter_t that is to be doing the receiving.
255 void *limiter_receive_thread(void *unused);
257 #endif /* _DRL_STATE_ */