X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=drl%2Fdrl_state.h;h=4fb710e064f38da210c17bb0167f90f96c69cafe;hb=8675c0b77ad3e361f4255ce61881a79061c5238d;hp=2b1ec5e68282fb867b31bd0b9721c665653ce4eb;hpb=0be9704d6b24d09ebd55beedec52758cb88c570b;p=distributedratelimiting.git diff --git a/drl/drl_state.h b/drl/drl_state.h index 2b1ec5e..4fb710e 100644 --- a/drl/drl_state.h +++ b/drl/drl_state.h @@ -30,14 +30,23 @@ #define MAX_IDENTS (1024) #define MAX_LIMITERS (128) +#define TRUE (1) +#define FALSE (0) -#define REMOTE_AWOL_THRESHOLD (3) - -enum transports { UDP, TCP }; +enum transports { UDP = 0, TCP = 1 }; +enum view_confidences { IN = 0, NOTIN = 1, UNSURE = 2 }; +enum reachabilities { REACHABLE = 0, SUSPECT = 1, UNREACHABLE = 2 }; typedef struct gossipval { + /* Fields that don't change. */ int gossip_branch; - double last_nonzero; + enum memberships membership; + enum failure_behaviors failure_behavior; + + /* Fields that change only on restart. */ + int32_t view; + + /* Fields that change frequently. */ double value; double weight; } gossip_t; @@ -60,6 +69,7 @@ typedef struct remote_node { in_port_t port; } remote_node_t; +//TODO: Clean this up typedef struct remote_limiter { /** The last known value at the remote limiter. */ double rate; @@ -68,19 +78,62 @@ typedef struct remote_limiter { out_neighbor_t outgoing; /* Socket to contact this remote limiter, if using TCP. */ - int socket; + //int socket; + /** IP address of the remote limiter, in network byte order. */ in_addr_t addr; in_port_t port; - /** Flag to keep track of situations in which we read from this node's - * value more than once before receiving an update from it. We use this - * value to know when it's safe to begin decaying the remote node's value - * (because we assume that it has failed). */ + /** Keeps track of the number of messages we have sent to this peer without + * having heard from them. */ int awol; + /** Whether or not we think this peer is reachable. */ + enum reachabilities reachability; + + /**Count of the rounds since doubt has risen and count of friends which + * suspect this node to be awol or alive*/ + int count_rounds; + int count_awol; + int count_alive; + + uint32_t incarnation; + + int32_t view; + enum view_confidences view_confidence; } remote_limiter_t; +//TODO: Reduce the size of this? +typedef struct message { + uint32_t magic; + uint32_t ident_id; + double value; + double weight; + uint32_t seqno; + uint32_t min_seqno; + uint16_t type; + + /** tell ping target the address of node which requested ping */ + in_addr_t ping_source; + in_port_t ping_port; + /** friend needs to be told the address of node suspected to be down */ + in_addr_t check_target; + in_port_t check_port; + /** friend responds with ALIVE / AWOL */ + uint32_t checkack_value; + /*Whether the message has an update piggy backed onto it*/ + uint32_t update_present; // TRUE or FALSE + /*Node is reachable or not*/ + uint32_t reachability; + /*Incarnation number of the node whose update + * is being sent piggy backed on the message*/ + uint32_t incarnation; + /*Address of the node whose update is being sent*/ + remote_node_t node; + + uint32_t view; +} message_t; + typedef struct comm { /** Communication policy. (COMM_MESH, COMM_GOSSIP) */ enum commfabrics comm_fabric; @@ -94,8 +147,6 @@ typedef struct comm { /** Previous local value. */ double last_local_rate; - double rate_change; - /** The number of remote nodes in the identity */ uint32_t remote_node_count; @@ -119,65 +170,41 @@ typedef struct comm { /** Function pointer to send function. */ int (*send_function)(struct comm *comm, uint32_t id, int sock); -#if 0 - /** Thread for handling incoming TCP data. */ - pthread_t tcp_recv_thread; + /** Function pointer to recv function for group membership. When a message + * is received, it is proccessed normally and then handed to this function + * in case additional processing is necessary for group membership. */ + int (*recv_function)(struct comm *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg); - /** Descriptor set for reading TCP messages */ - fd_set fds; -#endif + /** Function to restart the communication protocol. */ + void (*restart_function)(struct comm *comm, int32_t view_number); - /** Array of integers specifiying which nodes, if any, have outstanding - * unacked data. When nodes fall in this category, and it's time to send, - * these nodes will be chosen first. This only affects gossip. A - * negative number means there is no retransmit necessary. Otherwise, the - * value is the index into the remote_limiters array of the necessary - * retransmit. */ - int *retrys; + /** Flag indicating whether or not we are "connected" to the group + * membership service. This can only be false for membership schemes that + * require a persistent connection (Zookeeper). */ + int connected; -} comm_t; + /** Array of integers specifiying which nodes have been selected for + * message transmissions during the current round. */ + int *selected; -typedef struct message { - uint32_t magic; - uint32_t ident_id; - uint32_t seqno; - uint32_t min_seqno; - double value; - double weight; - uint16_t type; -} message_t; - -typedef struct hello_message { - uint32_t magic; - uint32_t ident_id; - uint16_t port; -} hello_t; + /** Array of indicies into remote_limiters. Used to keep a shuffled + * ordering for future gossip targets. */ + int *indices; + + /** The next index to use for target peer selection. The indicies are + * re-shuffled when this reaches remote_node_count. */ + uint32_t shuffle_index; -#if 0 -struct recv_thread_args { - comm_ident_t *ident; - pthread_rwlock_t *lock; - uint16_t port; -}; -#endif + void *membership_state; #if 0 -/** - * Initializes the global limiter. - * - * @param ipaddr The IP address on which the limiter should listen. - * INADDR_ANY will suffice. Should be specified in network byte order. - * - * @param port The port on which the limiter should listen. Should be specified - * in network byte order. - */ -void init_limiter(const in_addr_t ipaddr, const in_port_t port); + /** Thread for handling incoming TCP data. */ + pthread_t tcp_recv_thread; -/** - * Deallocates the entire global limiter. - */ -void destroy_limiter(); + /** Descriptor set for reading TCP messages */ + fd_set fds; #endif +} comm_t; /** * Fills in the communication structure of an identity. @@ -203,14 +230,9 @@ void free_comm(comm_t *comm); * Calculates and reads the current aggregate value for an identity. * This value includes the locally observed value. * - * @param comm The comm structure of the identity in question. - * - * @param aggregate The location at which the aggregate value will - * be stored. - * * @returns 0 on success, EINVAL on error. */ -int read_comm(comm_t *comm, double *aggregate); +int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit); /** * Updates the locally observed value of an identity. @@ -247,4 +269,34 @@ int send_update(comm_t *comm, uint32_t id); */ void *limiter_receive_thread(void *unused); +#if 0 +typedef struct hello_message { + uint32_t magic; + uint32_t ident_id; + uint16_t port; +} hello_t; + +struct recv_thread_args { + comm_ident_t *ident; + pthread_rwlock_t *lock; + uint16_t port; +}; + +/** + * Initializes the global limiter. + * + * @param ipaddr The IP address on which the limiter should listen. + * INADDR_ANY will suffice. Should be specified in network byte order. + * + * @param port The port on which the limiter should listen. Should be specified + * in network byte order. + */ +void init_limiter(const in_addr_t ipaddr, const in_port_t port); + +/** + * Deallocates the entire global limiter. + */ +void destroy_limiter(); +#endif + #endif /* _DRL_STATE_ */