#define MAX_IDENTS (1024)
#define MAX_LIMITERS (128)
+#define TRUE (1)
+#define FALSE (0)
-#define REMOTE_AWOL_THRESHOLD (5)
-
-enum transports { UDP, TCP };
+enum transports { UDP = 0, TCP = 1 };
+enum view_confidences { IN = 0, NOTIN = 1, UNSURE = 2 };
+enum reachabilities { REACHABLE = 0, SUSPECT = 1, UNREACHABLE = 2 };
typedef struct gossipval {
+ /* Fields that don't change. */
int gossip_branch;
- double last_nonzero;
+ enum memberships membership;
+ enum failure_behaviors failure_behavior;
+
+ /* Fields that change only on restart. */
+ int32_t view;
+
+ /* Fields that change frequently. */
double value;
double weight;
} gossip_t;
in_port_t port;
} remote_node_t;
+//TODO: Clean this up
typedef struct remote_limiter {
/** The last known value at the remote limiter. */
double rate;
out_neighbor_t outgoing;
/* Socket to contact this remote limiter, if using TCP. */
- int socket;
+ //int socket;
+ /** IP address of the remote limiter, in network byte order. */
in_addr_t addr;
in_port_t port;
- /** Flag to keep track of situations in which we read from this node's
- * value more than once before receiving an update from it. We use this
- * value to know when it's safe to begin decaying the remote node's value
- * (because we assume that it has failed). */
+ /** Keeps track of the number of messages we have sent to this peer without
+ * having heard from them. */
int awol;
+ /** Whether or not we think this peer is reachable. */
+ enum reachabilities reachability;
+
+ /**Count of the rounds since doubt has risen and count of friends which
+ * suspect this node to be awol or alive*/
+ int count_rounds;
+ int count_awol;
+ int count_alive;
+
+ uint32_t incarnation;
+
+ int32_t view;
+ enum view_confidences view_confidence;
} remote_limiter_t;
+//TODO: Reduce the size of this?
+typedef struct message {
+ uint32_t magic;
+ uint32_t ident_id;
+ double value;
+ double weight;
+ uint32_t seqno;
+ uint32_t min_seqno;
+ uint16_t type;
+
+ /** tell ping target the address of node which requested ping */
+ in_addr_t ping_source;
+ in_port_t ping_port;
+ /** friend needs to be told the address of node suspected to be down */
+ in_addr_t check_target;
+ in_port_t check_port;
+ /** friend responds with ALIVE / AWOL */
+ uint32_t checkack_value;
+ /*Whether the message has an update piggy backed onto it*/
+ uint32_t update_present; // TRUE or FALSE
+ /*Node is reachable or not*/
+ uint32_t reachability;
+ /*Incarnation number of the node whose update
+ * is being sent piggy backed on the message*/
+ uint32_t incarnation;
+ /*Address of the node whose update is being sent*/
+ remote_node_t node;
+
+ uint32_t view;
+} message_t;
+
typedef struct comm {
/** Communication policy. (COMM_MESH, COMM_GOSSIP) */
enum commfabrics comm_fabric;
/** Previous local value. */
double last_local_rate;
- double rate_change;
-
/** The number of remote nodes in the identity */
uint32_t remote_node_count;
/** Function pointer to send function. */
int (*send_function)(struct comm *comm, uint32_t id, int sock);
-#if 0
- /** Thread for handling incoming TCP data. */
- pthread_t tcp_recv_thread;
+ /** Function pointer to recv function for group membership. When a message
+ * is received, it is proccessed normally and then handed to this function
+ * in case additional processing is necessary for group membership. */
+ int (*recv_function)(struct comm *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
- /** Descriptor set for reading TCP messages */
- fd_set fds;
-#endif
+ /** Function to restart the communication protocol. */
+ void (*restart_function)(struct comm *comm, int32_t view_number);
- /** Array of integers specifiying which nodes, if any, have outstanding
- * unacked data. When nodes fall in this category, and it's time to send,
- * these nodes will be chosen first. This only affects gossip. A
- * negative number means there is no retransmit necessary. Otherwise, the
- * value is the index into the remote_limiters array of the necessary
- * retransmit. */
- int *retrys;
+ /** Flag indicating whether or not we are "connected" to the group
+ * membership service. This can only be false for membership schemes that
+ * require a persistent connection (Zookeeper). */
+ int connected;
-} comm_t;
+ /** Array of integers specifiying which nodes have been selected for
+ * message transmissions during the current round. */
+ int *selected;
-typedef struct message {
- uint32_t magic;
- uint32_t ident_id;
- uint32_t seqno;
- uint32_t min_seqno;
- double value;
- double weight;
- uint16_t type;
-} message_t;
-
-typedef struct hello_message {
- uint32_t magic;
- uint32_t ident_id;
- uint16_t port;
-} hello_t;
+ /** Array of indicies into remote_limiters. Used to keep a shuffled
+ * ordering for future gossip targets. */
+ int *indices;
+
+ /** The next index to use for target peer selection. The indicies are
+ * re-shuffled when this reaches remote_node_count. */
+ uint32_t shuffle_index;
-#if 0
-struct recv_thread_args {
- comm_ident_t *ident;
- pthread_rwlock_t *lock;
- uint16_t port;
-};
-#endif
+ void *membership_state;
#if 0
-/**
- * Initializes the global limiter.
- *
- * @param ipaddr The IP address on which the limiter should listen.
- * INADDR_ANY will suffice. Should be specified in network byte order.
- *
- * @param port The port on which the limiter should listen. Should be specified
- * in network byte order.
- */
-void init_limiter(const in_addr_t ipaddr, const in_port_t port);
+ /** Thread for handling incoming TCP data. */
+ pthread_t tcp_recv_thread;
-/**
- * Deallocates the entire global limiter.
- */
-void destroy_limiter();
+ /** Descriptor set for reading TCP messages */
+ fd_set fds;
#endif
+} comm_t;
/**
* Fills in the communication structure of an identity.
* Calculates and reads the current aggregate value for an identity.
* This value includes the locally observed value.
*
- * @param comm The comm structure of the identity in question.
- *
- * @param aggregate The location at which the aggregate value will
- * be stored.
- *
- * @param decayto When using a mesh comm fabric, limiters whose value
- * has not been heard in several timesteps will decay to this value.
- * Generally globallimit/N.
- *
* @returns 0 on success, EINVAL on error.
*/
-int read_comm(comm_t *comm, double *aggregate, double decayto);
+int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit);
/**
* Updates the locally observed value of an identity.
*/
void *limiter_receive_thread(void *unused);
+#if 0
+typedef struct hello_message {
+ uint32_t magic;
+ uint32_t ident_id;
+ uint16_t port;
+} hello_t;
+
+struct recv_thread_args {
+ comm_ident_t *ident;
+ pthread_rwlock_t *lock;
+ uint16_t port;
+};
+
+/**
+ * Initializes the global limiter.
+ *
+ * @param ipaddr The IP address on which the limiter should listen.
+ * INADDR_ANY will suffice. Should be specified in network byte order.
+ *
+ * @param port The port on which the limiter should listen. Should be specified
+ * in network byte order.
+ */
+void init_limiter(const in_addr_t ipaddr, const in_port_t port);
+
+/**
+ * Deallocates the entire global limiter.
+ */
+void destroy_limiter();
+#endif
+
#endif /* _DRL_STATE_ */