1 /* See the DRL-LICENSE file for this file's software license. */
6 #define _XOPEN_SOURCE 600
12 #include <sys/types.h>
15 /* in_addr_t, in_port_t */
16 #include <arpa/inet.h>
18 /* pthread functions. */
22 #include <sys/select.h>
24 /* Hash map types/functions. */
31 #define MAX_IDENTS (1024)
32 #define MAX_LIMITERS (128)
36 enum transports { UDP = 0, TCP = 1 };
37 enum view_confidences { IN = 0, NOTIN = 1, UNSURE = 2 };
38 enum reachabilities { REACHABLE = 0, SUSPECT = 1, UNREACHABLE = 2 };
40 typedef struct gossipval {
41 /* Fields that don't change. */
43 enum memberships membership;
44 enum failure_behaviors failure_behavior;
46 /* Fields that change only on restart. */
49 /* Fields that change frequently. */
54 typedef struct out_neighbor {
61 typedef struct in_neighbor {
67 typedef struct remote_node {
73 typedef struct remote_limiter {
74 /** The last known value at the remote limiter. */
77 in_neighbor_t incoming;
78 out_neighbor_t outgoing;
80 /* Socket to contact this remote limiter, if using TCP. */
83 /** IP address of the remote limiter, in network byte order. */
87 /** Keeps track of the number of messages we have sent to this peer without
88 * having heard from them. */
91 /** Whether or not we think this peer is reachable. */
92 enum reachabilities reachability;
94 /**Count of the rounds since doubt has risen and count of friends which
95 * suspect this node to be awol or alive*/
100 uint32_t incarnation;
103 enum view_confidences view_confidence;
106 //TODO: Reduce the size of this?
107 typedef struct message {
116 /** tell ping target the address of node which requested ping */
117 in_addr_t ping_source;
119 /** friend needs to be told the address of node suspected to be down */
120 in_addr_t check_target;
121 in_port_t check_port;
122 /** friend responds with ALIVE / AWOL */
123 uint32_t checkack_value;
124 /*Whether the message has an update piggy backed onto it*/
125 uint32_t update_present; // TRUE or FALSE
126 /*Node is reachable or not*/
127 uint32_t reachability;
128 /*Incarnation number of the node whose update
129 * is being sent piggy backed on the message*/
130 uint32_t incarnation;
131 /*Address of the node whose update is being sent*/
137 typedef struct comm {
138 /** Communication policy. (COMM_MESH, COMM_GOSSIP) */
139 enum commfabrics comm_fabric;
141 /** Transport protocol. */
142 enum transports transport_proto;
144 /** Current local value. */
147 /** Previous local value. */
148 double last_local_rate;
150 /** The number of remote nodes in the identity */
151 uint32_t remote_node_count;
153 remote_node_t *remote_nodes;
155 /** Array containing all known remote limiters in this identity.
156 * Contains the same information as the remote_node_map. */
157 remote_limiter_t *remote_limiters;
159 /** Hash map containing all the remote limiters in this identity.
160 * Indexed by the remote_node_t used to create the remote_limiter_t.
161 * Maps to the appropriate element of the remote_limiters array. */
162 map_handle remote_node_map;
164 /** A mutex to protect the comm structure. */
165 pthread_mutex_t lock;
167 /** Gossip values for our local identity */
170 /** Function pointer to send function. */
171 int (*send_function)(struct comm *comm, uint32_t id, int sock);
173 /** Function pointer to recv function for group membership. When a message
174 * is received, it is proccessed normally and then handed to this function
175 * in case additional processing is necessary for group membership. */
176 int (*recv_function)(struct comm *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
178 /** Function to restart the communication protocol. */
179 void (*restart_function)(struct comm *comm, int32_t view_number);
181 /** Flag indicating whether or not we are "connected" to the group
182 * membership service. This can only be false for membership schemes that
183 * require a persistent connection (Zookeeper). */
186 /** Array of integers specifiying which nodes have been selected for
187 * message transmissions during the current round. */
190 /** Array of indicies into remote_limiters. Used to keep a shuffled
191 * ordering for future gossip targets. */
194 /** The next index to use for target peer selection. The indicies are
195 * re-shuffled when this reaches remote_node_count. */
196 uint32_t shuffle_index;
198 void *membership_state;
201 /** Thread for handling incoming TCP data. */
202 pthread_t tcp_recv_thread;
204 /** Descriptor set for reading TCP messages */
210 * Fills in the communication structure of an identity.
212 * @param comm The communication structure to be created/populated.
214 * @param config The configuration options for the identity.
216 * @param nodes An array of remote nodes belonging to this identity.
218 * @returns 0 on success, ENOMEM if memory cannot be allocated.
220 int new_comm(comm_t *comm, ident_config *config, remote_node_t *nodes);
223 * Frees the memory associated with an identity's communication structure.
225 * @param comm The communication structure to free.
227 void free_comm(comm_t *comm);
230 * Calculates and reads the current aggregate value for an identity.
231 * This value includes the locally observed value.
233 * @returns 0 on success, EINVAL on error.
235 int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit);
238 * Updates the locally observed value of an identity.
240 * @param comm The comm structure of the identity to update.
242 * @param value The new locally observed value.
244 * @returns 0 on success, EINVAL on error.
246 int write_local_value(comm_t *comm, const double value);
249 * Sends the local state information to one or more peer limiters in the same
250 * identity. If the identity is configured as a mesh, it will send to all
251 * peers. If the identity is configured using gossip, it will send to the
252 * number of peers specified by the gossip_branch field of the comm_config_t
253 * that was used to configure the identity.
255 * @param comm The communication structure of the identity whose value should
258 * @param id The unique id of the identity that is sending.
260 * @returns 0 on success, ENOMEM if there was not enough memory, or possibly
261 * other E values as a result of socket-related results.
263 int send_update(comm_t *comm, uint32_t id);
266 * Thread that is responsible for receiving data from other limiters.
268 * @param limiter The limiter_t that is to be doing the receiving.
270 void *limiter_receive_thread(void *unused);
273 typedef struct hello_message {
279 struct recv_thread_args {
281 pthread_rwlock_t *lock;
286 * Initializes the global limiter.
288 * @param ipaddr The IP address on which the limiter should listen.
289 * INADDR_ANY will suffice. Should be specified in network byte order.
291 * @param port The port on which the limiter should listen. Should be specified
292 * in network byte order.
294 void init_limiter(const in_addr_t ipaddr, const in_port_t port);
297 * Deallocates the entire global limiter.
299 void destroy_limiter();
302 #endif /* _DRL_STATE_ */