/* See the DRL-LICENSE file for this file's software license. */ #ifndef _DRL_STATE_ #define _DRL_STATE_ #define _XOPEN_SOURCE 600 /* FILE */ #include /* uint32_t */ #include #include /* in_addr_t, in_port_t */ #include /* pthread functions. */ #include /* fd_set */ #include /* Hash map types/functions. */ #ifdef STANDALONE #include "../util.h" #else #include "util.h" #endif #define MAX_IDENTS (1024) #define MAX_LIMITERS (128) #define REMOTE_AWOL_THRESHOLD (3) enum transports { UDP, TCP }; typedef struct gossipval { int gossip_branch; double last_nonzero; double value; double weight; } gossip_t; typedef struct out_neighbor { uint32_t next_seqno; uint32_t first_seqno; double saved_value; double saved_weight; } out_neighbor_t; typedef struct in_neighbor { uint32_t seen_seqno; double saved_value; double saved_weight; } in_neighbor_t; typedef struct remote_node { in_addr_t addr; in_port_t port; } remote_node_t; typedef struct remote_limiter { /** The last known value at the remote limiter. */ double rate; in_neighbor_t incoming; out_neighbor_t outgoing; /* Socket to contact this remote limiter, if using TCP. */ int socket; in_addr_t addr; in_port_t port; /** Flag to keep track of situations in which we read from this node's * value more than once before receiving an update from it. We use this * value to know when it's safe to begin decaying the remote node's value * (because we assume that it has failed). */ int awol; } remote_limiter_t; typedef struct comm { /** Communication policy. (COMM_MESH, COMM_GOSSIP) */ enum commfabrics comm_fabric; /** Transport protocol. */ enum transports transport_proto; /** Current local value. */ double local_rate; /** Previous local value. */ double last_local_rate; double rate_change; /** The number of remote nodes in the identity */ uint32_t remote_node_count; remote_node_t *remote_nodes; /** Array containing all known remote limiters in this identity. * Contains the same information as the remote_node_map. */ remote_limiter_t *remote_limiters; /** Hash map containing all the remote limiters in this identity. * Indexed by the remote_node_t used to create the remote_limiter_t. * Maps to the appropriate element of the remote_limiters array. */ map_handle remote_node_map; /** A mutex to protect the comm structure. */ pthread_mutex_t lock; /** Gossip values for our local identity */ gossip_t gossip; /** Function pointer to send function. */ int (*send_function)(struct comm *comm, uint32_t id, int sock); #if 0 /** Thread for handling incoming TCP data. */ pthread_t tcp_recv_thread; /** Descriptor set for reading TCP messages */ fd_set fds; #endif /** Array of integers specifiying which nodes, if any, have outstanding * unacked data. When nodes fall in this category, and it's time to send, * these nodes will be chosen first. This only affects gossip. A * negative number means there is no retransmit necessary. Otherwise, the * value is the index into the remote_limiters array of the necessary * retransmit. */ int *retrys; } comm_t; typedef struct message { uint32_t magic; uint32_t ident_id; uint32_t seqno; uint32_t min_seqno; double value; double weight; uint16_t type; } message_t; typedef struct hello_message { uint32_t magic; uint32_t ident_id; uint16_t port; } hello_t; #if 0 struct recv_thread_args { comm_ident_t *ident; pthread_rwlock_t *lock; uint16_t port; }; #endif #if 0 /** * Initializes the global limiter. * * @param ipaddr The IP address on which the limiter should listen. * INADDR_ANY will suffice. Should be specified in network byte order. * * @param port The port on which the limiter should listen. Should be specified * in network byte order. */ void init_limiter(const in_addr_t ipaddr, const in_port_t port); /** * Deallocates the entire global limiter. */ void destroy_limiter(); #endif /** * Fills in the communication structure of an identity. * * @param comm The communication structure to be created/populated. * * @param config The configuration options for the identity. * * @param nodes An array of remote nodes belonging to this identity. * * @returns 0 on success, ENOMEM if memory cannot be allocated. */ int new_comm(comm_t *comm, ident_config *config, remote_node_t *nodes); /** * Frees the memory associated with an identity's communication structure. * * @param comm The communication structure to free. */ void free_comm(comm_t *comm); /** * Calculates and reads the current aggregate value for an identity. * This value includes the locally observed value. * * @param comm The comm structure of the identity in question. * * @param aggregate The location at which the aggregate value will * be stored. * * @returns 0 on success, EINVAL on error. */ int read_comm(comm_t *comm, double *aggregate); /** * Updates the locally observed value of an identity. * * @param comm The comm structure of the identity to update. * * @param value The new locally observed value. * * @returns 0 on success, EINVAL on error. */ int write_local_value(comm_t *comm, const double value); /** * Sends the local state information to one or more peer limiters in the same * identity. If the identity is configured as a mesh, it will send to all * peers. If the identity is configured using gossip, it will send to the * number of peers specified by the gossip_branch field of the comm_config_t * that was used to configure the identity. * * @param comm The communication structure of the identity whose value should * be propagated. * * @param id The unique id of the identity that is sending. * * @returns 0 on success, ENOMEM if there was not enough memory, or possibly * other E values as a result of socket-related results. */ int send_update(comm_t *comm, uint32_t id); /** * Thread that is responsible for receiving data from other limiters. * * @param limiter The limiter_t that is to be doing the receiving. */ void *limiter_receive_thread(void *unused); #endif /* _DRL_STATE_ */