/* See the DRL-LICENSE file for this file's software license. */ #ifndef _DRL_STATE_ #define _DRL_STATE_ #define _XOPEN_SOURCE 600 /* FILE */ #include /* uint32_t */ #include #include /* in_addr_t, in_port_t */ #include /* pthread functions. */ #include /* fd_set */ #include /* Hash map types/functions. */ #ifdef STANDALONE #include "../util.h" #else #include "util.h" #endif #define MAX_IDENTS (1024) #define MAX_LIMITERS (128) #define TRUE (1) #define FALSE (0) enum transports { UDP = 0, TCP = 1 }; enum view_confidences { IN = 0, NOTIN = 1, UNSURE = 2 }; enum reachabilities { REACHABLE = 0, SUSPECT = 1, UNREACHABLE = 2 }; typedef struct gossipval { /* Fields that don't change. */ int gossip_branch; enum memberships membership; enum failure_behaviors failure_behavior; /* Fields that change only on restart. */ int32_t view; /* Fields that change frequently. */ double value; double weight; } gossip_t; typedef struct out_neighbor { uint32_t next_seqno; uint32_t first_seqno; double saved_value; double saved_weight; } out_neighbor_t; typedef struct in_neighbor { uint32_t seen_seqno; double saved_value; double saved_weight; } in_neighbor_t; typedef struct remote_node { in_addr_t addr; in_port_t port; } remote_node_t; //TODO: Clean this up typedef struct remote_limiter { /** The last known value at the remote limiter. */ double rate; in_neighbor_t incoming; out_neighbor_t outgoing; /* Socket to contact this remote limiter, if using TCP. */ //int socket; /** IP address of the remote limiter, in network byte order. */ in_addr_t addr; in_port_t port; /** Keeps track of the number of messages we have sent to this peer without * having heard from them. */ int awol; /** Whether or not we think this peer is reachable. */ enum reachabilities reachability; /**Count of the rounds since doubt has risen and count of friends which * suspect this node to be awol or alive*/ int count_rounds; int count_awol; int count_alive; uint32_t incarnation; int32_t view; enum view_confidences view_confidence; } remote_limiter_t; //TODO: Reduce the size of this? typedef struct message { uint32_t magic; uint32_t ident_id; double value; double weight; uint32_t seqno; uint32_t min_seqno; uint16_t type; /** tell ping target the address of node which requested ping */ in_addr_t ping_source; in_port_t ping_port; /** friend needs to be told the address of node suspected to be down */ in_addr_t check_target; in_port_t check_port; /** friend responds with ALIVE / AWOL */ uint32_t checkack_value; /*Whether the message has an update piggy backed onto it*/ uint32_t update_present; // TRUE or FALSE /*Node is reachable or not*/ uint32_t reachability; /*Incarnation number of the node whose update * is being sent piggy backed on the message*/ uint32_t incarnation; /*Address of the node whose update is being sent*/ remote_node_t node; uint32_t view; } message_t; typedef struct comm { /** Communication policy. (COMM_MESH, COMM_GOSSIP) */ enum commfabrics comm_fabric; /** Transport protocol. */ enum transports transport_proto; /** Current local value. */ double local_rate; /** Previous local value. */ double last_local_rate; /** The number of remote nodes in the identity */ uint32_t remote_node_count; remote_node_t *remote_nodes; /** Array containing all known remote limiters in this identity. * Contains the same information as the remote_node_map. */ remote_limiter_t *remote_limiters; /** Hash map containing all the remote limiters in this identity. * Indexed by the remote_node_t used to create the remote_limiter_t. * Maps to the appropriate element of the remote_limiters array. */ map_handle remote_node_map; /** A mutex to protect the comm structure. */ pthread_mutex_t lock; /** Gossip values for our local identity */ gossip_t gossip; /** Function pointer to send function. */ int (*send_function)(struct comm *comm, uint32_t id, int sock); /** Function pointer to recv function for group membership. When a message * is received, it is proccessed normally and then handed to this function * in case additional processing is necessary for group membership. */ int (*recv_function)(struct comm *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg); /** Function to restart the communication protocol. */ void (*restart_function)(struct comm *comm, int32_t view_number); /** Flag indicating whether or not we are "connected" to the group * membership service. This can only be false for membership schemes that * require a persistent connection (Zookeeper). */ int connected; /** Array of integers specifiying which nodes have been selected for * message transmissions during the current round. */ int *selected; /** Array of indicies into remote_limiters. Used to keep a shuffled * ordering for future gossip targets. */ int *indices; /** The next index to use for target peer selection. The indicies are * re-shuffled when this reaches remote_node_count. */ uint32_t shuffle_index; void *membership_state; #if 0 /** Thread for handling incoming TCP data. */ pthread_t tcp_recv_thread; /** Descriptor set for reading TCP messages */ fd_set fds; #endif } comm_t; /** * Fills in the communication structure of an identity. * * @param comm The communication structure to be created/populated. * * @param config The configuration options for the identity. * * @param nodes An array of remote nodes belonging to this identity. * * @returns 0 on success, ENOMEM if memory cannot be allocated. */ int new_comm(comm_t *comm, ident_config *config, remote_node_t *nodes); /** * Frees the memory associated with an identity's communication structure. * * @param comm The communication structure to free. */ void free_comm(comm_t *comm); /** * Calculates and reads the current aggregate value for an identity. * This value includes the locally observed value. * * @returns 0 on success, EINVAL on error. */ int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit); /** * Updates the locally observed value of an identity. * * @param comm The comm structure of the identity to update. * * @param value The new locally observed value. * * @returns 0 on success, EINVAL on error. */ int write_local_value(comm_t *comm, const double value); /** * Sends the local state information to one or more peer limiters in the same * identity. If the identity is configured as a mesh, it will send to all * peers. If the identity is configured using gossip, it will send to the * number of peers specified by the gossip_branch field of the comm_config_t * that was used to configure the identity. * * @param comm The communication structure of the identity whose value should * be propagated. * * @param id The unique id of the identity that is sending. * * @returns 0 on success, ENOMEM if there was not enough memory, or possibly * other E values as a result of socket-related results. */ int send_update(comm_t *comm, uint32_t id); /** * Thread that is responsible for receiving data from other limiters. * * @param limiter The limiter_t that is to be doing the receiving. */ void *limiter_receive_thread(void *unused); #if 0 typedef struct hello_message { uint32_t magic; uint32_t ident_id; uint16_t port; } hello_t; struct recv_thread_args { comm_ident_t *ident; pthread_rwlock_t *lock; uint16_t port; }; /** * Initializes the global limiter. * * @param ipaddr The IP address on which the limiter should listen. * INADDR_ANY will suffice. Should be specified in network byte order. * * @param port The port on which the limiter should listen. Should be specified * in network byte order. */ void init_limiter(const in_addr_t ipaddr, const in_port_t port); /** * Deallocates the entire global limiter. */ void destroy_limiter(); #endif #endif /* _DRL_STATE_ */