@for d in $(SUBDIRS); do if ! make -C $$d; then exit 1; fi; done
ulogd: ulogd.c include/ulogd/ulogd.h ulogd.conf recurse
- $(CC) $(CFLAGS) -rdynamic $< conffile/conffile.o $(LIBIPULOG)/libipulog.a -o $@ $(LDFLAGS) $(LIBS) `xml2-config --libs`
+ $(CC) $(CFLAGS) -rdynamic $< conffile/conffile.o $(LIBIPULOG)/libipulog.a -o $@ $(LDFLAGS) $(LIBS) $(XML_LDFLAGS)
-edit = sed -e 's,@libdir\@,$(ULOGD_LIB_PATH),g'
+edit = sed -e 's,@libdir\@,$(ULOGD_LIB_PATH),g' -e 's,@etcdir\@,$(DESTDIR)$(ETCDIR),g'
ulogd.conf: ulogd.conf.in
$(edit) ulogd.conf.in > ulogd.conf
@INSTALL@ -D -m 755 ulogd $(DESTDIR)$(BINDIR)/ulogd
@[ -d $(DESTDIR)$(ETCDIR) ] || mkdir -p $(DESTDIR)$(ETCDIR)
@[ -f $(DESTDIR)$(ETCDIR)/ulogd.conf ] || @INSTALL@ -D -m 600 ulogd.conf $(DESTDIR)$(ETCDIR)/ulogd.conf
+ @[ -f $(DESTDIR)$(ETCDIR)/drl.xml ] || @INSTALL@ -D -m 600 drl.xml $(DESTDIR)$(ETCDIR)/drl.xml
doc:
$(MAKE) -C $@
LIBS=@LIBS@
-
# Names of the plugins to be compiled
ULOGD_SL:=BASE OPRINT PWSNIFF LOGEMU LOCAL SYSLOG
SQLITE3_CFLAGS=-I@SQLITE3INCLUDES@ @EXTRA_SQLITE3_DEF@
SQLITE3_LDFLAGS=@DATABASE_LIB_DIR@ @SQLITE3_LIB@
+XML_CFLAGS=@XMLINCLUDES@
+XML_LDFLAGS=@XMLLIBS@
+
+ZK_CFLAGS=@ZKFLAGS@
+ZK_LDFLAGS=@ZKLIBS@
dnl
AC_ARG_WITH(mysql,
--with-mysql=<directory> mysql installed in <directory>,[
-if test $withval == no
-then
- AC_MSG_WARN("mysql disabled.")
-else
if test $withval != yes
then
dir=$withval
AC_MSG_RESULT(found new MySQL)
fi
-fi
-fi
+fi
])
AC_MSG_WARN(the use of --with-mysql-log-ip-as-string is discouraged)
])
+dnl
+dnl test for libxml2
+dnl
+AC_ARG_WITH(libxml2, --with-libxml=<directory> libxml2 installed in <directory>,[
+if test $withval != yes
+then
+ dir=$withval
+else
+ dir="/usr/local"
+fi])
+
+libxmldir=""
+AC_MSG_CHECKING(for LIBXML2 files)
+for d in $dir/bin /usr/bin /usr/local/bin /usr/local/libxml2/bin /opt/libxml2/bin /opt/packages/libxml2/bin
+do
+ if test -x $d/xml2-config
+ then
+ AC_MSG_RESULT(found xml2-config in $d)
+ libxmldir=$d
+ break
+ fi
+done
+
+if test x$libxmldir = x
+then
+ AC_MSG_ERROR(xml2-config not found)
+else
+ XMLINCLUDES=`$libxmldir/xml2-config --cflags`
+ AC_SUBST(XMLINCLUDES)
+
+ XMLLIBS=`$libxmldir/xml2-config --libs`
+ AC_SUBST(XMLLIBS)
+fi
+
+dnl
+dnl check for zookeeper library
+dnl
+zkdir="/usr/local/lib"
+
+AC_ARG_WITH(zookeeper, --with-zookeeperlib=<directory> zookeeper shared object located in <directory>,[
+if test $withval = no
+then
+ zkdir=no
+ AC_MSG_WARN(Building without zookeeper support.)
+else
+ zkdir=$withval
+fi])
+
+zklib=""
+if test $zkdir != no
+then
+ AC_MSG_CHECKING(for zookeeper libraries)
+ for d in $zkdir /usr/local/lib /lib /usr/lib
+ do
+ if test -f $d/libzookeeper_mt.so
+ then
+ AC_MSG_RESULT(found libzookeeper_mt.so in $d)
+ zklib=$d/libzookeeper_mt.so
+ break
+ fi
+ done
+
+ if test x$zklib = x
+ then
+ dnl no zklib
+ AC_MSG_WARN(Zookeeper libraries not found.)
+ ZKLIBS=""
+ AC_SUBST(ZKLIBS)
+
+ ZKFLAGS=""
+ AC_SUBST(ZKFLAGS)
+ else
+ dnl found it
+ ZKLIBS=$zklib
+ AC_SUBST(ZKLIBS)
+
+ ZKFLAGS="-DTHREADED -DBUILD_ZOOKEEPER"
+ AC_SUBST(ZKFLAGS)
+ fi
+fi
dnl
dnl test for PostgreSQL
<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
<drl>
- <machine id="11" limit="10" commfabric="MESH" branch="0" accounting="STANDARD" ewma="0.1" intervals="1">
- <peer>127.0.0.1</peer>
+ <machine id="11" limit="10240" commfabric="MESH" failure_behavior="QUORUM" accounting="STANDARD" ewma="0.1">
+ <peer>x.x.x.x</peer>
+ <peer>y.y.y.y</peer>
</machine>
</drl>
+-->
#
include @top_srcdir@/Rules.make
-CFLAGS+=-I@top_srcdir@ -I@top_srcdir@/libipulog/include -I@top_srcdir@/include `xml2-config --cflags`
+CFLAGS+=-I@top_srcdir@ -I@top_srcdir@/libipulog/include -I@top_srcdir@/include -I@top_srcdir@/include/zookeeper $(XML_CFLAGS) $(ZK_CFLAGS)
SH_CFLAGS:=$(CFLAGS) -fPIC
# Normally You should not need to change anything below
#
SHARED_LIBS=ulogd_DRL.so
-OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o
+OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o swim.o ulogd_DRL.o util.o zk_drl.o
all: $(SHARED_LIBS)
distrib:
$(SHARED_LIBS): $(OBJECTS)
- $(LD) $(LDFLAGS) -shared -o $@ $(OBJECTS) -lc
+ $(LD) $(LDFLAGS) -shared -o $@ $(OBJECTS) -lc $(ZK_LDFLAGS)
%.o: %.c
$(CC) $(CFLAGS) -c $<
xmlChar *limit;
xmlChar *commfabric;
xmlChar *branch;
+ xmlChar *membership;
+ xmlChar *failure_behavior;
xmlChar *accounting;
xmlChar *ewma;
xmlChar *mainloop_intervals;
xmlNodePtr fields = ident->children;
ident_peer *current = NULL;
+ /* The struct has been memsetted to 0, this is just to be safe. */
+ common->zk_host = NULL;
+ common->peers = NULL;
+ common->members = NULL;
+ common->next = NULL;
+
/* Make sure no required fields are missing. */
id = xmlGetProp(ident, (const xmlChar *) "id");
if (id == NULL) {
xmlFree(commfabric);
}
- /* Only care about branching factor if we're using gossip. */
+ /* Only care about branching factor and failure detector if we're using gossip. */
if (common->commfabric == COMM_GOSSIP) {
branch = xmlGetProp(ident, (const xmlChar *) "branch");
if (branch == NULL) {
common->branch = atoi((const char *) branch);
xmlFree(branch);
}
+
+ membership = xmlGetProp(ident, (const xmlChar *) "membership");
+ if (membership == NULL) {
+ printlog(LOG_CRITICAL, "Ident missing membership protocol selection.\n");
+ return EINVAL;
+ } else {
+ if (!xmlStrcmp(membership, (const xmlChar *) "SWIM")) {
+ common->membership = SWIM;
+ } else if (!xmlStrcmp(membership, (const xmlChar *) "ZOOKEEPER")) {
+#ifdef BUILD_ZOOKEEPER
+ common->membership = ZOOKEEPER;
+#else
+ printlog(LOG_CRITICAL, "Zookeeper requested, but support not compiled into DRL at configure time.\n");
+ xmlFree(membership);
+ return EINVAL;
+#endif
+ } else {
+ printlog(LOG_CRITICAL, "Unknown/invalid gossip group membership protocol.\n");
+ xmlFree(membership);
+ return EINVAL;
+ }
+ xmlFree(membership);
+ }
+
+ failure_behavior = xmlGetProp(ident, (const xmlChar *) "failure_behavior");
+ if (failure_behavior == NULL) {
+ printlog(LOG_CRITICAL, "Ident missing failure handling behavior.\n");
+ return EINVAL;
+ } else {
+ if (!xmlStrcmp(failure_behavior, (const xmlChar *) "PANIC")) {
+ common->failure_behavior = PANIC;
+ } else if (!xmlStrcmp(failure_behavior, (const xmlChar *) "QUORUM")) {
+ common->failure_behavior = QUORUM;
+ } else {
+ printlog(LOG_CRITICAL, "Unknown/invalid gossip failure behavior policy.\n");
+ xmlFree(failure_behavior);
+ return EINVAL;
+ }
+ xmlFree(failure_behavior);
+ }
}
accounting = xmlGetProp(ident, (const xmlChar *) "accounting");
current->next = NULL;
}
xmlFree(ip);
+ } else if ((!xmlStrcmp(fields->name, (const xmlChar *) "zkhost"))) {
+ xmlChar *host = xmlNodeListGetString(doc, fields->children, 1);
+
+ common->zk_host = strdup((const char *) host);
+ if (common->zk_host == NULL) {
+ return ENOMEM;
+ }
+
+ xmlFree(host);
}
fields = fields->next;
}
return EINVAL;
}
+ if (common->membership == ZOOKEEPER && common->zk_host == NULL) {
+ printlog(LOG_CRITICAL, "Group membership protocol ZOOKEEPER requires a zkhost field.\n");
+ return EINVAL;
+ }
+
/* No errors. */
return 0;
}
/** The gossip branch factor (when commfabric is COMM_GOSSIP). */
int branch;
+ /** The gossip group membership policy (SWIM, ZOOKEEPER). */
+ enum memberships membership;
+
+ /** The behavioral policy to use when one or more failures in group
+ * membership are detected. */
+ enum failure_behaviors failure_behavior;
+
+#ifdef BUILD_ZOOKEEPER
+
+ /** The host string that should be passed to zookeeper_init when using
+ * zookeeper. This consists of comma-separated ipaddr:port pairs. Example:
+ * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" */
+ char *zk_host;
+
+#endif
+
/** The flow accounting mechanism to be used by this identity. */
enum accountings accounting;
#include "ratetypes.h"
#include "drl_state.h"
#include "peer_comm.h"
+#include "swim.h"
#include "logging.h"
+#ifdef BUILD_ZOOKEEPER
+ #include "zk_drl.h"
+#endif
+
extern limiter_t limiter;
+static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) {
+ switch (comm->gossip.membership) {
+ case SWIM:
+ return swim_init(comm, id);
+ break;
+
+#ifdef BUILD_ZOOKEEPER
+
+ case ZOOKEEPER:
+ return zk_drl_init(comm, id, &limiter, config);
+ break;
+
+#endif
+
+ default:
+ printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
+ return EINVAL;
+ }
+}
+
+static void group_membership_teardown(comm_t *comm) {
+ switch (comm->gossip.membership) {
+ case SWIM:
+ swim_teardown(comm);
+ break;
+
+#ifdef BUILD_ZOOKEEPER
+
+ case ZOOKEEPER:
+ zk_drl_close(comm);
+ break;
+
+#endif
+
+ default:
+ printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
+ }
+}
+
+void null_restart_function(comm_t *comm, int32_t view_number) {
+ /* Intentionally empty. */
+}
+
int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
int i;
+ int result = 0;
memset(comm, 0, sizeof(comm_t));
comm->transport_proto = UDP;
comm->remote_node_count = config->peer_count;
comm->gossip.gossip_branch = config->branch;
+ comm->gossip.membership = config->membership;
+ comm->gossip.failure_behavior = config->failure_behavior;
comm->gossip.weight = 1.0;
pthread_mutex_init(&comm->lock, NULL);
-
- /* Set send function. */
+
+ // allocate memory to the indices
+ comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count);
+ memset(comm->indices, 0, sizeof(int)*comm->remote_node_count);
+ for(i = 0; i < comm->remote_node_count; i++)
+ comm->indices[i] = i;
+ comm->shuffle_index = comm->remote_node_count;
+
+ /* Set default comm function pointers. These may get overwritten later
+ * by more specific initialization routines such as group membership
+ * init calls. */
switch (config->commfabric) {
case COMM_MESH:
comm->send_function = send_udp_mesh;
+ comm->recv_function = recv_mesh;
+ comm->restart_function = null_restart_function;
break;
case COMM_GOSSIP:
comm->send_function = send_udp_gossip;
+ comm->recv_function = recv_gossip;
+ comm->restart_function = null_restart_function;
break;
}
}
/* Allocate remote_limiters array and fill it in. Add remotes to map. */
- comm->remote_limiters =
- malloc(config->peer_count * sizeof(remote_limiter_t));
+ comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t));
if (comm->remote_limiters == NULL) {
pthread_mutex_destroy(&comm->lock);
comm->remote_limiters[i].addr = remote_nodes[i].addr;
comm->remote_limiters[i].port = remote_nodes[i].port;
comm->remote_limiters[i].outgoing.next_seqno = 1;
+ comm->remote_limiters[i].reachability = REACHABLE;
+ comm->remote_limiters[i].awol = 0;
+ comm->remote_limiters[i].count_rounds = 0;
+ comm->remote_limiters[i].count_awol = 0;
+ comm->remote_limiters[i].count_alive = 0;
map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
sizeof(remote_node_t), &comm->remote_limiters[i]);
}
-
- /* Allocate and initialize retrys. */
- comm->retrys = malloc(config->branch * sizeof(int));
- if (comm->retrys == NULL) {
+
+ /* Allocate and initialize selected. */
+ comm->selected = malloc(config->branch * sizeof(int));
+ if (comm->selected == NULL) {
pthread_mutex_destroy(&comm->lock);
free_map(comm->remote_node_map, 0);
free(comm->remote_limiters);
}
for (i = 0; i < config->branch; ++i) {
- comm->retrys[i] = -1;
+ comm->selected[i] = -1;
}
- return 0;
+ if (config->commfabric == COMM_GOSSIP) {
+ result = group_membership_init(comm, config->id, config);
+ if (result) {
+ pthread_mutex_destroy(&comm->lock);
+ free_map(comm->remote_node_map, 0);
+ free(comm->remote_limiters);
+ free(comm->selected);
+ }
+ }
+
+ return result;
}
void free_comm(comm_t *comm) {
if (comm) {
+ if (comm->comm_fabric == COMM_GOSSIP) {
+ group_membership_teardown(comm);
+ }
+
if (comm->remote_limiters) {
free(comm->remote_limiters);
}
pthread_mutex_destroy(&comm->lock);
- if (comm->retrys) {
- free(comm->retrys);
+ if (comm->selected) {
+ free(comm->selected);
}
}
}
-int read_comm(comm_t *comm, double *aggregate, double decayto) {
+int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) {
remote_limiter_t *remote;
pthread_mutex_lock(&comm->lock);
if (comm->comm_fabric == COMM_MESH) {
*aggregate = 0;
+ *effective_global = global_limit;
map_reset_iterate(comm->remote_node_map);
while ((remote = map_next(comm->remote_node_map))) {
- /* remote->rate corresponds to the rate (GRD) or weight (FPS)
- * in generated by the peer remote. */
- *aggregate += remote->rate;
-
- /* If we continue to read it without having heard an update,
- * we start to make the peer's value approach decayto, getting
- * half of the way there each time. */
- if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) {
+ if (remote->reachability != REACHABLE) {
printlog(LOG_WARN, "AWOL remote limiter detected.\n");
- remote->rate += ((decayto - remote->rate) / 2);
+ *effective_global -= (global_limit / (comm->remote_node_count + 1));
} else {
- remote->awol++;
+ /* remote->rate corresponds to the rate (GRD) or weight (FPS)
+ * in generated by the peer remote. */
+ *aggregate += remote->rate;
}
}
*aggregate += comm->local_rate;
} else if (comm->comm_fabric == COMM_GOSSIP) {
- int i;
- int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
double value = 0;
+ int i;
value = (comm->gossip.value / comm->gossip.weight);
value *= (comm->remote_node_count + 1);
- /* Keep around the last value so that we don't stupidly pick 0 when
- * we're negative. If we pick 0, it looks to the limiter like it
- * has free reign and it will take 100% of the rate allocation for
- * itself. This is a lie. Open question what to do here... FIXME: Use decayto?*/
- if (value <= 0) {
- //*aggregate = comm->gossip.last_nonzero;
- *aggregate = 0;
- printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n");
- } else {
- *aggregate = value;
- comm->gossip.last_nonzero = *aggregate;
- printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value);
- }
-
- for (i = 0; i < comm->remote_node_count; ++i) {
- if (comm->remote_limiters[i].awol == threshold) {
- /* Re-claim any value/weight sent. */
- comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value;
- comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight;
+ /* Look up the failure handling policy and check to see if it is
+ * is currently relevant. */
+ if (comm->gossip.failure_behavior == PANIC) {
+ int panic = 0;
+ if (!comm->connected) {
+ panic = 1;
+ }
- comm->remote_limiters[i].outgoing.saved_value = 0.0;
- comm->remote_limiters[i].outgoing.saved_weight = 0.0;
+ for (i = 0; i < comm->remote_node_count; ++i) {
+ if (comm->remote_limiters[i].reachability != REACHABLE) {
+ panic = 1;
+ }
+ }
- comm->remote_limiters[i].awol += 1;
- } else if (comm->remote_limiters[i].awol < threshold) {
- comm->remote_limiters[i].awol += 1;
+ if (panic) {
+ printlog(LOG_DEBUG, "GOSSIP: Panicking!\n");
+ *aggregate = comm->local_rate;
+ *effective_global = (global_limit / (comm->remote_node_count + 1));
+ } else {
+ *aggregate = (value > 0) ? value : 0;
+ *effective_global = global_limit;
+ }
+ } else if (comm->gossip.failure_behavior == QUORUM) {
+ *effective_global = global_limit;
+ if (comm->connected) {
+ for (i = 0; i < comm->remote_node_count; ++i) {
+ if (comm->remote_limiters[i].reachability != REACHABLE) {
+ *effective_global -= (global_limit / (comm->remote_node_count + 1));
+ }
+ }
+ *aggregate = (value > 0) ? value : 0;
+ } else {
+ /* Not part of the Quorum - do 1/n. */
+ printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n");
+ *aggregate = comm->local_rate;
+ *effective_global = (global_limit / (comm->remote_node_count + 1));
}
}
+ printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate);
} else {
printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
comm->comm_fabric);
if (comm->comm_fabric == COMM_MESH) {
comm->last_local_rate = comm->local_rate;
comm->local_rate = value;
- comm->rate_change = comm->local_rate - comm->last_local_rate;
} else if (comm->comm_fabric == COMM_GOSSIP) {
comm->last_local_rate = comm->local_rate;
comm->local_rate = value;
- comm->rate_change = comm->local_rate - comm->last_local_rate;
- /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
- /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
- comm->gossip.value += comm->rate_change;
+ comm->gossip.value += (comm->local_rate - comm->last_local_rate);
+ printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value);
}
else {
printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
}
void *limiter_receive_thread(void *unused) {
+ printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n");
sigset_t signal_mask;
sigemptyset(&signal_mask);
#define MAX_IDENTS (1024)
#define MAX_LIMITERS (128)
+#define TRUE (1)
+#define FALSE (0)
-#define MESH_REMOTE_AWOL_THRESHOLD (5)
-
-//FIXME: Make this more scientific?
-#define GOSSIP_REMOTE_AWOL_THRESHOLD (10 * comm->remote_node_count / comm->gossip.gossip_branch)
-
-enum transports { UDP, TCP };
+enum transports { UDP = 0, TCP = 1 };
+enum view_confidences { IN = 0, NOTIN = 1, UNSURE = 2 };
+enum reachabilities { REACHABLE = 0, SUSPECT = 1, UNREACHABLE = 2 };
typedef struct gossipval {
+ /* Fields that don't change. */
int gossip_branch;
- double last_nonzero;
+ enum memberships membership;
+ enum failure_behaviors failure_behavior;
+
+ /* Fields that change only on restart. */
+ int32_t view;
+
+ /* Fields that change frequently. */
double value;
double weight;
} gossip_t;
in_port_t port;
} remote_node_t;
+//TODO: Clean this up
typedef struct remote_limiter {
/** The last known value at the remote limiter. */
double rate;
out_neighbor_t outgoing;
/* Socket to contact this remote limiter, if using TCP. */
- int socket;
+ //int socket;
+ /** IP address of the remote limiter, in network byte order. */
in_addr_t addr;
in_port_t port;
- /** Flag to keep track of situations in which we read from this node's
- * value more than once before receiving an update from it. We use this
- * value to know when it's safe to begin decaying the remote node's value
- * (because we assume that it has failed). */
+ /** Keeps track of the number of messages we have sent to this peer without
+ * having heard from them. */
int awol;
+ /** Whether or not we think this peer is reachable. */
+ enum reachabilities reachability;
+
+ /**Count of the rounds since doubt has risen and count of friends which
+ * suspect this node to be awol or alive*/
+ int count_rounds;
+ int count_awol;
+ int count_alive;
+
+ uint32_t incarnation;
+
+ int32_t view;
+ enum view_confidences view_confidence;
} remote_limiter_t;
+//TODO: Reduce the size of this?
+typedef struct message {
+ uint32_t magic;
+ uint32_t ident_id;
+ double value;
+ double weight;
+ uint32_t seqno;
+ uint32_t min_seqno;
+ uint16_t type;
+
+ /** tell ping target the address of node which requested ping */
+ in_addr_t ping_source;
+ in_port_t ping_port;
+ /** friend needs to be told the address of node suspected to be down */
+ in_addr_t check_target;
+ in_port_t check_port;
+ /** friend responds with ALIVE / AWOL */
+ uint32_t checkack_value;
+ /*Whether the message has an update piggy backed onto it*/
+ uint32_t update_present; // TRUE or FALSE
+ /*Node is reachable or not*/
+ uint32_t reachability;
+ /*Incarnation number of the node whose update
+ * is being sent piggy backed on the message*/
+ uint32_t incarnation;
+ /*Address of the node whose update is being sent*/
+ remote_node_t node;
+
+ uint32_t view;
+} message_t;
+
typedef struct comm {
/** Communication policy. (COMM_MESH, COMM_GOSSIP) */
enum commfabrics comm_fabric;
/** Previous local value. */
double last_local_rate;
- double rate_change;
-
/** The number of remote nodes in the identity */
uint32_t remote_node_count;
/** Function pointer to send function. */
int (*send_function)(struct comm *comm, uint32_t id, int sock);
-#if 0
- /** Thread for handling incoming TCP data. */
- pthread_t tcp_recv_thread;
-
- /** Descriptor set for reading TCP messages */
- fd_set fds;
-#endif
+ /** Function pointer to recv function for group membership. When a message
+ * is received, it is proccessed normally and then handed to this function
+ * in case additional processing is necessary for group membership. */
+ int (*recv_function)(struct comm *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
- /** Array of integers specifiying which nodes, if any, have outstanding
- * unacked data. When nodes fall in this category, and it's time to send,
- * these nodes will be chosen first. This only affects gossip. A
- * negative number means there is no retransmit necessary. Otherwise, the
- * value is the index into the remote_limiters array of the necessary
- * retransmit. */
- int *retrys;
+ /** Function to restart the communication protocol. */
+ void (*restart_function)(struct comm *comm, int32_t view_number);
-} comm_t;
+ /** Flag indicating whether or not we are "connected" to the group
+ * membership service. This can only be false for membership schemes that
+ * require a persistent connection (Zookeeper). */
+ int connected;
-typedef struct message {
- uint32_t magic;
- uint32_t ident_id;
- uint32_t seqno;
- uint32_t min_seqno;
- double value;
- double weight;
- uint16_t type;
-} message_t;
+ /** Array of integers specifiying which nodes have been selected for
+ * message transmissions during the current round. */
+ int *selected;
-typedef struct hello_message {
- uint32_t magic;
- uint32_t ident_id;
- uint16_t port;
-} hello_t;
+ /** Array of indicies into remote_limiters. Used to keep a shuffled
+ * ordering for future gossip targets. */
+ int *indices;
+
+ /** The next index to use for target peer selection. The indicies are
+ * re-shuffled when this reaches remote_node_count. */
+ uint32_t shuffle_index;
-#if 0
-struct recv_thread_args {
- comm_ident_t *ident;
- pthread_rwlock_t *lock;
- uint16_t port;
-};
-#endif
+ void *membership_state;
#if 0
-/**
- * Initializes the global limiter.
- *
- * @param ipaddr The IP address on which the limiter should listen.
- * INADDR_ANY will suffice. Should be specified in network byte order.
- *
- * @param port The port on which the limiter should listen. Should be specified
- * in network byte order.
- */
-void init_limiter(const in_addr_t ipaddr, const in_port_t port);
+ /** Thread for handling incoming TCP data. */
+ pthread_t tcp_recv_thread;
-/**
- * Deallocates the entire global limiter.
- */
-void destroy_limiter();
+ /** Descriptor set for reading TCP messages */
+ fd_set fds;
#endif
+} comm_t;
/**
* Fills in the communication structure of an identity.
* Calculates and reads the current aggregate value for an identity.
* This value includes the locally observed value.
*
- * @param comm The comm structure of the identity in question.
- *
- * @param aggregate The location at which the aggregate value will
- * be stored.
- *
- * @param decayto When using a mesh comm fabric, limiters whose value
- * has not been heard in several timesteps will decay to this value.
- * Generally globallimit/N.
- *
* @returns 0 on success, EINVAL on error.
*/
-int read_comm(comm_t *comm, double *aggregate, double decayto);
+int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit);
/**
* Updates the locally observed value of an identity.
*/
void *limiter_receive_thread(void *unused);
+#if 0
+typedef struct hello_message {
+ uint32_t magic;
+ uint32_t ident_id;
+ uint16_t port;
+} hello_t;
+
+struct recv_thread_args {
+ comm_ident_t *ident;
+ pthread_rwlock_t *lock;
+ uint16_t port;
+};
+
+/**
+ * Initializes the global limiter.
+ *
+ * @param ipaddr The IP address on which the limiter should listen.
+ * INADDR_ANY will suffice. Should be specified in network byte order.
+ *
+ * @param port The port on which the limiter should listen. Should be specified
+ * in network byte order.
+ */
+void init_limiter(const in_addr_t ipaddr, const in_port_t port);
+
+/**
+ * Deallocates the entire global limiter.
+ */
+void destroy_limiter();
+#endif
+
#endif /* _DRL_STATE_ */
double ideal_weight;
double total_weight = peer_weights + ident->last_localweight;
- if (target >= ident->limit) {
+ if (target >= ident->effective_limit) {
ideal_weight = total_weight;
} else if (target <= 0) {
ideal_weight = 0; // no flows here
} else {
- ideal_weight = ((double)target / (double)ident->limit) * total_weight;
+ ideal_weight = ((double)target / (double)ident->effective_limit) * total_weight;
}
-#if 0
- else if (peer_weights <= 0) {
-#if 0
- // doesn't matter what we pick as our weight, so pick 1 / N.
- ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
-#endif
- ideal_weight = ((double)target / (double)ident->limit) * total_weight;
- } else {
-#if 0
- double divisor = (double) ident->limit - (double) target;
- ideal_weight = ((double) target * peer_weights) / divisor;
-#else
- ideal_weight = ((double)target / (double)ident->limit) * total_weight;
-#endif
- }
-#endif
-
return ideal_weight;
}
if (ident->dampen_state == DAMPEN_TEST) {
int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
- double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+ double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
if (rate_delta > threshold) {
ident->dampen_state = DAMPEN_PASSED;
/* Convert weight value into a rate limit. If there is no measureable
* weight, do a L/n allocation. */
if (total_weight > 0) {
- resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight);
+ resulting_limit = (uint32_t) (ident->localweight * ident->effective_limit / total_weight);
} else {
resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
}
if (ident->dampen_state_copy == DAMPEN_TEST) {
int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
- double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+ double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
if (rate_delta > threshold) {
ident->dampen_state_copy = DAMPEN_PASSED;
/* Convert weight value into a rate limit. If there is no measureable
* weight, do a L/n allocation. */
if (total_weight > 0) {
- resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight);
+ resulting_limit = (uint32_t) (ident->localweight_copy * ident->effective_limit / total_weight);
} else {
resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
}
#endif
-/**
- * Determines the amount of FPS weight to allocate to the identity during each
- * estimate interval. Note that total_weight includes local weight.
- */
-static uint32_t allocate_fps_old(identity_t *ident, double total_weight) {
- common_accounting_t *ftable = &ident->common; /* Common flow table info */
- uint32_t local_rate = ftable->rate;
- uint32_t ideallocal = 0;
- double peer_weights; /* sum of weights of all other limiters */
- double idealweight = 0;
- double last_portion = 0;
- double this_portion = 0;
-
- static int dampen = 0;
- int dampen_increase = 0;
-
- double ideal_under = 0;
- double ideal_over = 0;
-
- int regime = 0;
-
- /* two cases:
- 1. the aggregate is < limit
- 2. the aggregate is >= limit
- */
- peer_weights = total_weight - ident->last_localweight;
- if (peer_weights < 0) {
- peer_weights = 0;
- }
-
- if (dampen == 1) {
- int64_t rate_delta =
- (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
- double threshold =
- (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
-
- if (rate_delta > threshold) {
- dampen_increase = 1;
- printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
- rate_delta, threshold);
- }
- }
-
- if (local_rate <= 0) {
- idealweight = 0;
- } else if (dampen_increase == 0 &&
- (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) {
- /* We're under the limit - all flows are bottlenecked. */
- idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
- ideal_over = allocate_fps_over_limit(ident);
- ideal_under = idealweight;
-
- if (ideal_over < idealweight) {
- idealweight = ideal_over;
- regime = 3;
- dampen = 2;
- } else {
- regime = 1;
- dampen = 0;
- }
-
- /* Apply EWMA */
- ident->localweight = (ident->localweight * ident->ewma_weight +
- idealweight * (1 - ident->ewma_weight));
-
- } else {
- idealweight = allocate_fps_over_limit(ident);
-
- /* Apply EWMA */
- ident->localweight = (ident->localweight * ident->ewma_weight +
- idealweight * (1 - ident->ewma_weight));
-
- /* This is the portion of the total weight in the system that was caused
- * by this limiter in the last interval. */
- last_portion = ident->last_localweight / total_weight;
-
- /* This is the fraction of the total weight in the system that our
- * proposed value for idealweight would use. */
- this_portion = ident->localweight / (peer_weights + ident->localweight);
-
- /* Dampen the large increase the first time... */
- if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
- ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
- dampen = 1;
- } else {
- dampen = 2;
- }
-
- ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
- ideal_over = idealweight;
-
- regime = 2;
- }
-
- /* Convert weight into a rate - add in our new local weight */
- ident->total_weight = total_weight = ident->localweight + peer_weights;
-
- /* compute local allocation:
- if there is traffic elsewhere, use the weights
- otherwise do a L/n allocation */
- if (total_weight > 0) {
- //if (peer_weights > 0) {
- ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
- } else {
- ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
- }
-
- printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
-
- printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f Under / Over / Actual / Rate\n",
- ideal_under / (ideal_under + peer_weights),
- ideal_over / (ideal_over + peer_weights),
- ident->localweight / (ident->localweight + peer_weights),
- (double) local_rate / (double) ident->limit);
-
- printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
-
- if (system_loglevel == LOG_DEBUG) {
- printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
- local_rate, idealweight, ident->localweight, total_weight);
- }
-
-#if 0
- if (printcounter <= 0) {
- struct timeval tv;
- double time_now;
-
- gettimeofday(&tv, NULL);
- time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
-
- printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d ", time_now, ftable->inst_rate,
- idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k,
- ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate,
- ftable->max_flow_rate, ftable->max_flow_rate_flow_hash);
-
- printcounter = PRINT_COUNTER_RESET;
- } else {
- printcounter -= 1;
- }
-
- //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
- // dampen, dampen_increase, peer_weights, regime);
-
- if (regime == 3) {
- printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
- ideal_over, ideal_under);
- }
- See print_statistics()
-#endif
-
- printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
-
- return(ideallocal);
-}
-
/**
* Determines the local drop probability for a GRD identity every estimate
* interval.
*/
static double allocate_grd(identity_t *ident, double aggdemand) {
double dropprob;
- double global_limit = ident->limit;
double min_dropprob = ident->drop_prob * GRD_BIG_DROP;
struct timeval tv;
gettimeofday(&tv, NULL);
time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
- if (aggdemand > global_limit) {
- dropprob = (aggdemand-global_limit)/aggdemand;
+ if (aggdemand > ident->effective_limit) {
+ dropprob = (aggdemand - ident->effective_limit) / aggdemand;
} else {
dropprob = 0.0;
}
*/
static void allocate(limiter_t *limiter, identity_t *ident) {
/* Represents aggregate rate for GRD and aggregate weight for FPS. */
- double comm_val = 0;
-
- /* Read comm_val from comm layer. */
- if (limiter->policy == POLICY_FPS) {
- read_comm(&ident->comm, &comm_val,
- ident->total_weight / (double) (ident->comm.remote_node_count + 1));
- } else {
- read_comm(&ident->comm, &comm_val,
- (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
- }
- printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
+ double aggregate = 0;
+ /* Read aggregate from comm layer. */
+ read_comm(&aggregate, &ident->effective_limit, &ident->comm, ident->limit);
+ printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", aggregate);
+
/* Experimental printing. */
printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
(double) ident->common.rate / (double) 128, ident->id);
if (limiter->policy == POLICY_FPS) {
#ifdef SHADOW_ACCTING
- allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID");
+ allocate_fps_pretend(ident, aggregate, &ident->shadow_common, "SHADOW-ID");
ident->last_localweight_copy = ident->localweight_copy;
#endif
- ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID");
+ ident->locallimit = allocate_fps(ident, aggregate, &ident->common, "ID");
ident->last_localweight = ident->localweight;
/* Update other limiters with our weight by writing to comm layer. */
write_local_value(&ident->comm, ident->localweight);
} else {
ident->last_drop_prob = ident->drop_prob;
- ident->drop_prob = allocate_grd(ident, comm_val);
+ ident->drop_prob = allocate_grd(ident, aggregate);
/* Update other limiters with our rate by writing to comm layer. */
write_local_value(&ident->comm, ident->common.rate);
#include "peer_comm.h"
#include "logging.h"
-/* Artifically makes a network partition. */
-int do_partition = 0;
-int partition_set = 0xfffffff;
+#define NULL_PEER (-2)
+#define MESH_REMOTE_AWOL_THRESHOLD (5)
+#define GOSSIP_REMOTE_AWOL_THRESHOLD (5)
-extern limiter_t limiter;
+/* From ulogd_DRL.c */
+extern int do_partition;
+extern int partition_set;
-static const uint32_t MAGIC_MSG = 0x123123;
-static const uint32_t MAGIC_HELLO = 0x456456;
-static const uint16_t MSG = 1;
-static const uint16_t ACK = 2;
+extern limiter_t limiter;
-static void message_to_hbo(message_t *msg) {
+void message_to_hbo(message_t *msg) {
msg->magic = ntohl(msg->magic);
msg->ident_id = ntohl(msg->ident_id);
+ /* value is a double */
+ /* weight is a double */
msg->seqno = ntohl(msg->seqno);
msg->min_seqno = ntohl(msg->min_seqno);
msg->type = ntohs(msg->type);
- /* value is a double */
- /* weight is a double */
+ /* ping_source, ping_port, check_target, and check_port stay in nbo. */
+ msg->checkack_value = ntohl(msg->checkack_value);
+ msg->update_present = ntohl(msg->update_present);
+ msg->reachability = ntohl(msg->reachability);
+ msg->incarnation = ntohl(msg->incarnation);
+ /* node has two fields, both stay in nbo. */
+ msg->view = ntohl(msg->view);
}
-static void message_to_nbo(message_t *msg) {
+void message_to_nbo(message_t *msg) {
msg->magic = htonl(msg->magic);
msg->ident_id = htonl(msg->ident_id);
+ /* value is a double */
+ /* weight is a double */
msg->seqno = htonl(msg->seqno);
msg->min_seqno = htonl(msg->min_seqno);
msg->type = htons(msg->type);
- /* value is a double */
- /* weight is a double */
-}
-
-static void hello_to_hbo(hello_t *hello) {
- hello->magic = ntohl(hello->magic);
- hello->ident_id = ntohl(hello->ident_id);
- hello->port = ntohs(hello->port);
-}
-
-static void hello_to_nbo(hello_t *hello) {
- hello->magic = htonl(hello->magic);
- hello->ident_id = htonl(hello->ident_id);
- hello->port = htons(hello->port);
-}
-
-static int is_connected(remote_limiter_t *remote) {
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
- return 1;
- else
- return 0;
+ /* ping_source, ping_port, check_target, and check_port already in nbo. */
+ msg->checkack_value = htonl(msg->checkack_value);
+ msg->update_present = htonl(msg->update_present);
+ msg->reachability = htonl(msg->reachability);
+ msg->incarnation = htonl(msg->incarnation);
+ /* node has two fields, both already in nbo. */
+ msg->view = htonl(msg->view);
}
-static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
+int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view) {
int result = 0;
message_t msg;
struct sockaddr_in toaddr;
memset(&msg, 0, sizeof(msg));
msg.magic = MAGIC_MSG;
- msg.ident_id = ident->id;
- msg.type = ACK;
+ msg.ident_id = id;
+ msg.type = type;
msg.seqno = seqno;
+ msg.view = view;
message_to_nbo(&msg);
return;
}
- switch (ident->comm.comm_fabric) {
- case COMM_MESH: {
- /* Use the message's value to be our new GRDrate/FPSweight for the
- * message's sender. */
- remote->rate = msg.value;
-
- /* Reset the AWOL counter to zero since we received an update. */
- remote->awol = 0;
- }
- break;
-
- case COMM_GOSSIP: {
- if (msg.type == ACK) {
- if (msg.seqno == remote->outgoing.next_seqno - 1) {
- int i;
-
- /* Ack for most recent message. Clear saved state. */
- remote->outgoing.first_seqno = remote->outgoing.next_seqno;
- remote->outgoing.saved_value = 0;
- remote->outgoing.saved_weight = 0;
-
- for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
- if (ident->comm.retrys[i] >= 0 &&
- remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
- ident->comm.retrys[i] = -2;
- }
- }
- }
- /* Ignore ack if it isn't for most recent message. */
- } else {
- if (msg.min_seqno > remote->incoming.seen_seqno) {
- /* Entirely new information */
- remote->incoming.seen_seqno = msg.seqno;
- remote->incoming.saved_value = msg.value;
- remote->incoming.saved_weight = msg.weight;
- ident->comm.gossip.value += msg.value;
- ident->comm.gossip.weight += msg.weight;
- send_ack(ident, remote, msg.seqno);
- remote->awol = 0;
- } else if (msg.seqno > remote->incoming.seen_seqno) {
- /* Only some of the message is old news. */
- double diff_value = msg.value - remote->incoming.saved_value;
- double diff_weight = msg.weight - remote->incoming.saved_weight;
-
- remote->incoming.seen_seqno = msg.seqno;
- remote->incoming.saved_value = msg.value;
- remote->incoming.saved_weight = msg.weight;
-
- ident->comm.gossip.value += diff_value;
- ident->comm.gossip.weight += diff_weight;
- send_ack(ident, remote, msg.seqno);
- remote->awol = 0;
- } else {
- /* The entire message is old news. (Duplicate). */
- /* Do nothing. */
- }
- }
- }
- break;
-
- default: {
- printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
- }
- }
-
+ /* Pass the message to the comm's recv function, which is responsible for
+ * processing its contents. */
+ ident->comm.recv_function(&ident->comm, ident->id, limiter.udp_socket, remote, &msg);
+
pthread_mutex_unlock(&ident->comm.lock);
pthread_rwlock_unlock(&limiter.limiter_lock);
}
-#if 0
-static void limiter_accept(comm_limiter_t *limiter) {
- int sock, result;
- struct sockaddr_in fromaddr;
- socklen_t fromlen = sizeof(fromaddr);
- remote_node_t sender;
- remote_limiter_t *remote;
- hello_t hello;
- comm_ident_t *ident;
- ident_handle *handle = NULL;
-
- sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
-
- assert(sock > 0);
-
- memset(&hello, 0, sizeof(hello_t));
- result = recv(sock, &hello, sizeof(hello_t), 0);
+int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+ /* Use the message's value to be our new GRDrate/FPSweight for the
+ * message's sender. */
+ remote->rate = msg->value;
- if (result < 0) {
- close(sock);
- return; /* Failure - ignore it. */
- }
-
- assert(result == sizeof(hello_t));
+ /* Reset the AWOL counter to zero since we received an update. */
+ remote->awol = 0;
+ remote->reachability = REACHABLE;
- hello_to_hbo(&hello);
-
- assert(hello.magic == MAGIC_HELLO);
-
- memset(&sender, 0, sizeof(remote_node_t));
- sender.addr = fromaddr.sin_addr.s_addr;
- sender.port = ntohs(hello.port);
-
- pthread_testcancel();
-
- pthread_rwlock_rdlock(&limiter->rwlock);
-
- handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
-
- if (handle == NULL) {
- printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
- pthread_rwlock_unlock(&limiter->rwlock);
- return;
- }
-
- ident = limiter->identities[*handle];
- assert(ident != NULL);
-
- pthread_mutex_lock(&ident->lock);
-
- remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
-
- if (remote == NULL) {
- printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(&limiter->rwlock);
- close(sock);
- return;
- }
-
- if (is_connected(remote)) {
- /* We are still connected, don't need the new socket. */
- close(sock);
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(&limiter->rwlock);
- return;
- }
-
- /* We weren't connected, but we are now... */
- remote->socket = sock;
- printf("Got connection on: %d\n", sock);
- FD_SET(sock, &ident->fds);
-
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(&limiter->rwlock);
-}
-
-static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
- int result;
- message_t msg;
-
- memset(&msg, 0, sizeof(message_t));
-
- result = recv(sock, &msg, sizeof(message_t), 0);
-
- if (result < 0) {
- pthread_rwlock_rdlock(limiter_rwlock);
- pthread_mutex_lock(&ident->lock);
- FD_CLR(sock, &ident->fds);
- close(sock);
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(limiter_rwlock);
- return;
- }
-
- assert(result == sizeof(message_t));
-
- message_to_hbo(&msg);
- assert(msg.magic == MAGIC_MSG);
-
- pthread_rwlock_rdlock(limiter_rwlock);
- pthread_mutex_lock(&ident->lock);
-
- switch (ident->comm_fabric) {
- case COMM_GOSSIP: {
- ident->gossip.value += msg.value;
- ident->gossip.weight += msg.weight;
- }
- break;
-
- default: {
- assert(1 == 0); /* This case shouldn't happen. Punt for now... */
- }
- }
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(limiter_rwlock);
+ return 0;
}
-static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
- int select_result, i;
- fd_set fds_copy;
- struct timeval timeout;
-
- FD_ZERO(&fds_copy);
- timeout.tv_sec = 15;
- timeout.tv_usec = 0;
-
- pthread_rwlock_rdlock(limiter_rwlock);
- pthread_mutex_lock(&ident->lock);
- memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
- pthread_mutex_unlock(&ident->lock);
- pthread_rwlock_unlock(limiter_rwlock);
-
- /* mask interrupt signals for this thread? */
-
- select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
-
- assert(select_result >= 0);
-
- if (select_result == 0)
- return; /* Timed out */
-
- for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
- if (FD_ISSET(i, &fds_copy)) {
- read_tcp_message(ident, limiter_rwlock, i);
- select_result--;
- }
- }
-}
-#endif
-
-/* Turn this on to simulate network partitions.
- * Turn off for production settings. */
-//#define ALLOW_PARTITION
-
int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
int result = 0;
remote_limiter_t *remote;
msg.magic = MAGIC_MSG;
msg.ident_id = id;
msg.value = comm->local_rate;
+ msg.view = comm->gossip.view;
/* Do we want seqnos for mesh? We can get by without them. */
message_to_nbo(&msg);
for (i = 0; i < comm->remote_node_count; ++i) {
remote = &comm->remote_limiters[i];
+ /* Increase this counter. For mesh, it represents the number of messages we have sent to
+ * this remote limiter without having heard from it. This is reset to 0 when we receive
+ * an update from this peer. */
+ remote->awol += 1;
+ if (remote->awol > MESH_REMOTE_AWOL_THRESHOLD) {
+ remote->reachability = UNREACHABLE;
+ }
+
#ifdef ALLOW_PARTITION
if (do_partition) {
return result;
}
+int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+ if (msg->type == ACK) {
+ /* If ACK was received then reset the awol count */
+ if (msg->seqno == remote->outgoing.next_seqno - 1) {
+ /* Ack for most recent message. Clear saved state. */
+ remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+ remote->outgoing.saved_value = 0;
+ remote->outgoing.saved_weight = 0;
+
+ remote->awol = 0;
+ }
+ /* Ignore ack if it isn't for most recent message. */
+ } else if (msg->type == MSG) {
+ if (msg->min_seqno > remote->incoming.seen_seqno) {
+ /* Entirely new information */
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+ comm->gossip.value += msg->value;
+ comm->gossip.weight += msg->weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ }
+ else if (msg->seqno > remote->incoming.seen_seqno) {
+ /* Only some of the message is old news. */
+ double diff_value = msg->value - remote->incoming.saved_value;
+ double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+
+ comm->gossip.value += diff_value;
+ comm->gossip.weight += diff_weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ }
+ else {
+ /* The entire message is old news. (Duplicate). */
+ /* Do nothing. */
+ }
+ }
+
+ return 0;
+}
+
+int find_gossip_target(comm_t *comm) {
+ int target = NULL_PEER;
+ int k;
+
+ if (comm->shuffle_index < comm->remote_node_count) {
+ target = comm->indices[comm->shuffle_index];
+ printlog(LOG_DEBUG,"GOSSIP: found index %d.\n", target);
+ comm->shuffle_index++;
+ }
+ else {
+ // shuffle the remote_limiters array
+ printlog(LOG_DEBUG, "GOSSIP: shuffling the array.\n");
+ for ( k = 0; k < comm->remote_node_count; k++) {
+ uint32_t l = myrand() % comm->remote_node_count;
+ int t;
+ t = comm->indices[l];
+ comm->indices[l] = comm->indices[k];
+ comm->indices[k] = t;
+ }
+ comm->shuffle_index = 0;
+ target = comm->indices[comm->shuffle_index];
+ printlog(LOG_DEBUG,"GOSSIP: found index after spilling over %d.\n", target);
+ comm->shuffle_index++;
+ }
+ return target;
+}
+
int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
- int i, j, targetid;
- int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
- int rand_count; //HACK...
+ int i, j;
+ int retry_index = 0;
int result = 0;
remote_limiter_t *remote;
struct sockaddr_in toaddr;
* that was sent to the peers. In the case of not being able to send to a
* peer though, we increment this to reclaim the value/weight locally. */
int message_portion = 1;
-
+
memset(&toaddr, 0, sizeof(struct sockaddr_in));
toaddr.sin_family = AF_INET;
msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+ int targetid = NULL_PEER;
+ int rand_count = 0;
message_t msg;
printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
- if (comm->retrys[i] >= 0) {
- remote = &comm->remote_limiters[comm->retrys[i]];
- targetid = comm->retrys[i];
-
- if (remote->awol > awol_threshold) {
- message_portion += 1;
- printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]);
- comm->retrys[i] = -1;
- continue;
+ /* If there are any peers with unacked messages, select them first. */
+ while (retry_index < comm->remote_node_count) {
+ if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
+ targetid = retry_index;
+ printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
}
- } else {
- targetid = -2;
- rand_count = 0;
-
- while (targetid == -2 && rand_count < 50) {
- targetid = myrand() % comm->remote_node_count;
- rand_count += 1;
-
- /* Don't select an already-used index. */
- for (j = 0; j < comm->gossip.gossip_branch; ++j) {
- if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) {
- printlog(LOG_DEBUG, "Gossip: disqualified targetid %d. retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol);
- targetid = -2;
- break;
- }
+
+ retry_index += 1;
+ }
+
+ while (targetid == NULL_PEER && rand_count < 10) {
+ /* Select a recipient from a randomly-shuffled array. */
+ targetid = find_gossip_target(comm);
+
+ assert(targetid != NULL_PEER);
+
+ /* Don't select an already-used index. */
+ for (j = 0; j < i; ++j) {
+ if (targetid == comm->selected[j]) {
+ printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]);
+ targetid = NULL_PEER;
+ break;
}
}
- if (targetid < 0) {
- /* Couldn't find a suitable peer to send to... */
- message_portion += 1;
- printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n");
- continue;
- } else {
- printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid);
+ /* Don't select an unreachable peer or one that is not in our view. */
+ if (targetid != NULL_PEER) {
+ if (comm->remote_limiters[targetid].reachability != REACHABLE ||
+ comm->remote_limiters[targetid].view != comm->gossip.view ||
+ comm->remote_limiters[targetid].view_confidence != IN) {
+ printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d, reachability is %d, remote view is %d (confidence:%d), my view is %d\n",
+ targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].view,
+ comm->remote_limiters[targetid].view_confidence, comm->gossip.view);
+ targetid = NULL_PEER;
+ }
}
- remote = &comm->remote_limiters[targetid];
+ rand_count++;
}
-
- comm->retrys[i] = targetid;
+
+ if (targetid == NULL_PEER) {
+ /* Couldn't find a suitable peer to send to... */
+ message_portion += 1;
+ printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
+ continue;
+ } else {
+ printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
+ }
+
+ remote = &comm->remote_limiters[targetid];
+ comm->selected[i] = targetid;
toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
toaddr.sin_port = remote->port;
msg.seqno = remote->outgoing.next_seqno;
msg.min_seqno = remote->outgoing.first_seqno;
msg.type = MSG;
-
+ msg.view = comm->gossip.view;
+
remote->outgoing.next_seqno++;
remote->outgoing.saved_value += msg_value;
remote->outgoing.saved_weight += msg_weight;
+ /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
+ remote->awol += 1;
+
#ifdef ALLOW_PARTITION
if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
- printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid);
+ printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
continue;
}
break;
}
}
-
comm->gossip.value = msg_value * message_portion;
comm->gossip.weight = msg_weight * message_portion;
return result;
}
+
+/* Old TCP code. */
#if 0
+static void hello_to_hbo(hello_t *hello) {
+ hello->magic = ntohl(hello->magic);
+ hello->ident_id = ntohl(hello->ident_id);
+ hello->port = ntohs(hello->port);
+}
+
+static void hello_to_nbo(hello_t *hello) {
+ hello->magic = htonl(hello->magic);
+ hello->ident_id = htonl(hello->ident_id);
+ hello->port = htons(hello->port);
+}
+
+static int is_connected(remote_limiter_t *remote) {
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
+ return 1;
+ else
+ return 0;
+}
+
int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
int i, targetid, sock;
int result = 0;
return result;
}
-#endif
-#if 0
void *limiter_accept_thread(void *limiter) {
sigset_t signal_mask;
}
pthread_exit(NULL);
}
+
+static void limiter_accept(comm_limiter_t *limiter) {
+ int sock, result;
+ struct sockaddr_in fromaddr;
+ socklen_t fromlen = sizeof(fromaddr);
+ remote_node_t sender;
+ remote_limiter_t *remote;
+ hello_t hello;
+ comm_ident_t *ident;
+ dent_handle *handle = NULL;
+
+ sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
+
+ assert(sock > 0);
+
+ memset(&hello, 0, sizeof(hello_t));
+ result = recv(sock, &hello, sizeof(hello_t), 0);
+
+ if (result < 0) {
+ close(sock);
+ return; /* Failure - ignore it. */
+ }
+
+ assert(result == sizeof(hello_t));
+
+ hello_to_hbo(&hello);
+
+ assert(hello.magic == MAGIC_HELLO);
+
+ memset(&sender, 0, sizeof(remote_node_t));
+ sender.addr = fromaddr.sin_addr.s_addr;
+ sender.port = ntohs(hello.port);
+
+ pthread_testcancel();
+
+ pthread_rwlock_rdlock(&limiter->rwlock);
+
+ handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
+
+ if (handle == NULL) {
+ printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
+ pthread_rwlock_unlock(&limiter->rwlock);
+ return;
+ }
+
+ ident = limiter->identities[*handle];
+ assert(ident != NULL);
+
+ pthread_mutex_lock(&ident->lock);
+
+ remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
+
+ if (remote == NULL) {
+ printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+ close(sock);
+ return;
+ }
+
+ if (is_connected(remote)) {
+ /* We are still connected, don't need the new socket. */
+ close(sock);
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+ return;
+ }
+
+ /* We weren't connected, but we are now... */
+ remote->socket = sock;
+ printf("Got connection on: %d\n", sock);
+ FD_SET(sock, &ident->fds);
+
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(&limiter->rwlock);
+}
+
+static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
+ int result;
+ message_t msg;
+
+ memset(&msg, 0, sizeof(message_t));
+
+ result = recv(sock, &msg, sizeof(message_t), 0);
+
+ if (result < 0) {
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+ FD_CLR(sock, &ident->fds);
+ close(sock);
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+ return;
+ }
+
+ assert(result == sizeof(message_t));
+
+ message_to_hbo(&msg);
+ assert(msg.magic == MAGIC_MSG);
+
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+
+ switch (ident->comm_fabric) {
+ case COMM_GOSSIP: {
+ ident->gossip.value += msg.value;
+ ident->gossip.weight += msg.weight;
+ }
+ break;
+
+ default: {
+ assert(1 == 0); /* This case shouldn't happen. Punt for now... */
+ }
+ }
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+}
+
+static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
+ int select_result, i;
+ fd_set fds_copy;
+ struct timeval timeout;
+
+ FD_ZERO(&fds_copy);
+ timeout.tv_sec = 15;
+ timeout.tv_usec = 0;
+
+ pthread_rwlock_rdlock(limiter_rwlock);
+ pthread_mutex_lock(&ident->lock);
+ memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
+ pthread_mutex_unlock(&ident->lock);
+ pthread_rwlock_unlock(limiter_rwlock);
+
+ /* mask interrupt signals for this thread? */
+
+ select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
+
+ assert(select_result >= 0);
+
+ if (select_result == 0)
+ return; /* Timed out */
+
+ for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
+ if (FD_ISSET(i, &fds_copy)) {
+ read_tcp_message(ident, limiter_rwlock, i);
+ select_result--;
+ }
+ }
+}
#endif
#ifndef _PEER_COMM_H_
#define _PEER_COMM_H_
+#define NULL_PEER (-2)
+#define MESH_REMOTE_AWOL_THRESHOLD (5)
+#define GOSSIP_REMOTE_AWOL_THRESHOLD (5)
+
+static const uint32_t MAGIC_MSG = 0x123123;
+static const uint32_t MAGIC_HELLO = 0x456456;
+static const uint16_t MSG = 1;
+static const uint16_t ACK = 2;
+
void limiter_receive();
-#if 0
-void *limiter_accept_thread(void *limiter);
+void message_to_hbo(message_t *msg);
-void *ident_receive_thread(void *limiter);
-#endif
+void message_to_nbo(message_t *msg);
int send_udp_mesh(comm_t *comm, uint32_t id, int sock);
+int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
+
int send_udp_gossip(comm_t *comm, uint32_t id, int sock);
+int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
+
+int find_gossip_target(comm_t *comm);
+
+int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view);
#if 0
+void *limiter_accept_thread(void *limiter);
+
+void *ident_receive_thread(void *limiter);
+
int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused);
#endif
enum policies { POLICY_GRD = 1, POLICY_FPS = 2 };
enum commfabrics { COMM_MESH = 1, COMM_GOSSIP = 2 };
-enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4};
-enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4};
+enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4 };
+enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4 };
+enum memberships { SWIM = 1, ZOOKEEPER = 2 };
+enum failure_behaviors { PANIC = 1, QUORUM = 2 };
/* The comm library also has definitions for comfabrics. This prevents us
* from defining them twice. */
* any type of production setting. */
//#define SHADOW_ACCTING
+/* Turn this on to simulate network partitions.
+ * Turn off for production settings. */
+#define ALLOW_PARTITION
+
/* forward declare some structs */
struct limiter;
struct identity;
/** The global rate limit. */
uint32_t limit;
+ /** The effective global rate limit. Can be lower than the nominal global
+ * rate limit due to the failure of one or more peers. */
+ uint32_t effective_limit;
+
/** The local rate limit. */
uint32_t locallimit;
--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+#include <errno.h>
+
+/* Debug output. */
+#include <stdio.h>
+#include <stdlib.h>
+
+/* Socket functions. */
+#include <sys/types.h>
+#include <sys/socket.h>
+
+/* Byte ordering and address structures. */
+#include <arpa/inet.h>
+
+/* memset() */
+#include <string.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "swim.h"
+#include "logging.h"
+
+/* From ulogd_DRL.c */
+extern int do_partition;
+extern int partition_set;
+
+extern limiter_t limiter;
+
+/**Finds the update, if found then frees the memory of the new_update
+ * and returns 1. If find fails then this returns 0*/
+static int find_and_update(update_t *updates, update_t *new_update) {
+ if( updates == NULL ) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: no existing updates\n");
+ return 0;
+ }
+ update_t *pointer = updates;
+ while(pointer != NULL) {
+ if(pointer->remote == new_update->remote && pointer->remote->incarnation >= new_update->remote->incarnation) {
+ pointer->count = 0;
+ printlog(LOG_DEBUG, "SWIM: INFECT: update already exists\n");
+ free(new_update);
+ return 1;
+ }
+ pointer = pointer->next;
+ }
+ printlog(LOG_DEBUG, "SWIM: INFECT: update not found among existing updates\n");
+ return 0;
+}
+
+/*Just adds to the end of list and returns the head*/
+update_t *add_to_list(update_t *updates, update_t *new_update) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: adding to list of updates: %s is %d\n", inet_ntoa(*(struct in_addr *)&new_update->remote->addr), new_update->remote->reachability);
+ update_t *head = updates;
+ update_t *pointer;
+ if (head == NULL) {
+ head = new_update;
+ } else {
+ pointer = head;
+ while(pointer->next != NULL) {
+ pointer = pointer->next;
+ }
+ pointer->next = new_update;
+ }
+ return head;
+}
+
+/** Given the address of the suspected node this function
+ * identifies friends who can probe the suspected node
+ * After recording these, the messages for help are sent
+ * in the next gossip round*/
+static int help_from_friends(comm_t *comm, int suspect_index, uint32_t id, int sock) {
+ printlog(LOG_DEBUG,"SWIM: In function help_from_friends suspected node: %s, index: %d\n",inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
+ int i=0, j = 0;
+ int result;
+ int count_friends = (comm->remote_node_count > MAX_FRIENDS ) ? MAX_FRIENDS : comm->remote_node_count; // A more logical way?
+ // remote_node_t friend_nodes[count_friends];
+ // int friend_ids[count_friends];
+
+ while( (i - j) < count_friends && i < comm->remote_node_count ) {
+ // Do not pick a friend who is suspected to be down
+ if (comm->remote_limiters[i].reachability != REACHABLE) {
+ j++; i++;
+ continue;
+ }
+ /**construct the message and send it to friend
+ * pick up friend i*/
+ message_t check_msg;
+ memset(&check_msg, 0, sizeof(message_t));
+ check_msg.magic = MAGIC_MSG;
+ check_msg.ident_id = id;
+ check_msg.value = 0;
+ check_msg.weight = 0;
+ check_msg.seqno = 0;
+ check_msg.min_seqno = 0;
+ check_msg.type = CHECK;
+ check_msg.check_target = comm->remote_limiters[suspect_index].addr;
+ check_msg.check_port = comm->remote_limiters[suspect_index].port;
+
+ // send the message
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = comm->remote_limiters[i].addr;
+ toaddr.sin_port = comm->remote_limiters[i].port;
+ message_to_nbo(&check_msg);
+
+ printlog(LOG_DEBUG,"SWIM: Sending CHECK message to friend %s i: %d", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr), i);
+ printlog(LOG_DEBUG," Suspect: %s", inet_ntoa(*(struct in_addr *)&check_msg.check_target));
+ printlog(LOG_DEBUG," Initial: %s suspect_index: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
+ if (sendto(sock, &check_msg, sizeof(check_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ }
+ i++;
+ }
+ printlog(LOG_DEBUG,"SWIM: Out function help_from_friends.\n");
+ return 0;
+}
+
+/** Receiving CHECK packet*/
+static void swim_receive_check(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+
+#ifdef ALLOW_PARTITION
+ int id;
+ for(id = 0; id < comm->remote_node_count; id++) {
+ if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
+ if (do_partition && ((partition_set & (1 << id)) == 0)) {
+ printlog(LOG_DEBUG, "SWIM: Ignoring CHECK message from %d\n", id);
+ return;
+ }
+ }
+ }
+#endif
+
+//FIX
+ if(remote->reachability != REACHABLE)
+ return;
+
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ // create the message that has to be sent to the suspected node
+ printlog(LOG_DEBUG,"SWIM: received CHECK message from %s", inet_ntoa(*(struct in_addr *)&remote->addr));
+ printlog(LOG_DEBUG,", sending PING to %s\n", inet_ntoa(*(struct in_addr *)&msg->check_target));
+ int result;
+ message_t ping_msg;
+ memset(&ping_msg, 0, sizeof(message_t));
+ ping_msg.magic = MAGIC_MSG;
+ ping_msg.ident_id = msg->ident_id;
+ ping_msg.value = 0;
+ ping_msg.weight = 0;
+ ping_msg.seqno = 0;
+ ping_msg.min_seqno = 0;
+ ping_msg.type = PING;
+ ping_msg.ping_source = remote->addr;
+ ping_msg.ping_port = remote->port;
+ // send the ping message
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = msg->check_target;
+ toaddr.sin_port = msg->check_port;
+ message_to_nbo(&ping_msg);
+
+ /** add to ping targets before sending message */
+ ping_target_t *suspect = (ping_target_t*) malloc(sizeof(ping_target_t));
+ memset(suspect, 0, sizeof(ping_target_t));
+ suspect->target.addr = msg->check_target;
+ suspect->target.port = msg->check_port;
+ suspect->source.addr = remote->addr;
+ suspect->source.port = remote->port;
+ printlog(LOG_DEBUG, "SWIM: adding %s to PING list\n", inet_ntoa(*(struct in_addr *)&suspect->target.addr));
+
+ ping_target_t *pointer = swim_comm->ping_targets;
+ if( swim_comm->ping_targets != NULL ) {
+ while(pointer->next != NULL) {
+ pointer = pointer->next;
+ }
+ pointer->next = suspect;
+ }
+ else {
+ swim_comm->ping_targets = suspect;
+ }
+ /** added to the end of the list of ping_targets */
+
+ if (sendto(limiter.udp_socket, &ping_msg, sizeof(ping_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ } else {
+ printlog(LOG_DEBUG,"SWIM: Sent PING message\n");
+ }
+ printlog(LOG_DEBUG,"SWIM: Processed CHECK packet\n");
+ return;
+}
+
+/** Receiving PING packet*/
+static void swim_receive_ping(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+
+#ifdef ALLOW_PARTITION
+ int id;
+ for(id = 0; id < comm->remote_node_count; id++) {
+ if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
+ if (do_partition && ((partition_set & (1 << id)) == 0)) {
+ printlog(LOG_DEBUG, "SWIM: Ignoring PING message from %d\n", id);
+ return;
+ }
+ }
+ }
+#endif
+
+ printlog(LOG_DEBUG,"SWIM: receiving the PING message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+
+//FIX
+ if(remote->reachability != REACHABLE)
+ return;
+
+ int result;
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ message_t pingack_msg;
+ memset(&pingack_msg, 0, sizeof(message_t));
+ pingack_msg.magic = MAGIC_MSG;
+ pingack_msg.ident_id = msg->ident_id;
+ pingack_msg.value = 0;
+ pingack_msg.weight = 0;
+ pingack_msg.seqno = 0;
+ pingack_msg.min_seqno = 0;
+ pingack_msg.type = PING_ACK;
+
+ swim_comm->incarnation++;
+ pingack_msg.update_present = TRUE;
+ pingack_msg.reachability = REACHABLE;
+ pingack_msg.incarnation = swim_comm->incarnation;
+ FILE *fp = fopen("/root/incarnation", "w+");
+ fprintf(fp, "%d", swim_comm->incarnation + 1);
+ fflush(fp);
+ fclose(fp);
+ // send PING_ACK
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = remote->addr;
+ toaddr.sin_port = remote->port;
+ message_to_nbo(&pingack_msg);
+
+ if (sendto(limiter.udp_socket, &pingack_msg, sizeof(pingack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ } else {
+ printlog(LOG_DEBUG, "SWIM: sent PING_ACK to %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+ }
+ printlog(LOG_DEBUG,"SWIM: Processed PING packet\n");
+ return;
+}
+
+/** Receiving PING_ACK packet*/
+static void swim_receive_pingack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+ // find the source which requested this ping and inform it with CHECK_ACK, ALIVE
+ // look up in the ping_targets list.
+ printlog(LOG_DEBUG, "SWIM: receiving the PING_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+ flushlog();
+ int result, confirm;
+ int delete_head = 0;
+ ping_target_t* pointer;
+ ping_target_t* prev_pointer;
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+ pointer = swim_comm->ping_targets;
+ prev_pointer = swim_comm->ping_targets;
+
+/*
+ * Removed this because if a PING_ACK arrives then CHECK_ACK would be
+ * sent to all the nodes which requested a check on this node. Hence pointer
+ * could be NULL but we could receive a CHECK_ACK packet
+ * if (pointer == NULL) {
+ printlog(LOG_DEBUG, "SWIM: Received PING_ACK for a PING not sent\n");
+ return;
+ }
+*/
+ while(pointer!=NULL) {
+ if(pointer->target.addr == remote->addr && pointer->target.port == remote->port) {
+ // suspect has been found in the list
+ // now construct the CHECK_ACK message and send it to source
+ message_t checkack_msg;
+ memset(&checkack_msg, 0, sizeof(message_t));
+ checkack_msg.magic = MAGIC_MSG;
+ checkack_msg.ident_id = msg->ident_id;
+ checkack_msg.value = 0;
+ checkack_msg.weight = 0;
+ checkack_msg.seqno = 0;
+ checkack_msg.min_seqno = 0;
+ checkack_msg.type = CHECK_ACK;
+ checkack_msg.checkack_value = ALIVE;
+ // inform the source of the addr and port of suspected node
+ checkack_msg.check_target = remote->addr;
+ checkack_msg.check_port = remote->port;
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ // found source
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = pointer->source.addr;
+ toaddr.sin_port = pointer->source.port;
+ message_to_nbo(&checkack_msg);
+
+ if (sendto(limiter.udp_socket, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: swim_receive_pingack : sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ }
+
+ /** Now delete this suspect from friends list of nodes*/
+ if(prev_pointer == pointer) {
+ swim_comm->ping_targets = pointer->next;
+ pointer->next = NULL;
+ free(pointer);
+ pointer = swim_comm->ping_targets;
+ delete_head = 1;
+ } else {
+ prev_pointer->next = pointer->next;
+ pointer->next = NULL;
+ free(pointer);
+ pointer = prev_pointer;
+ }
+ confirm = 1;
+ }
+ prev_pointer = pointer;
+ if(pointer != NULL && delete_head != 1) {
+ pointer = pointer->next;
+ }
+ delete_head = 0;
+ printf("SWIM: PING ACK\n");
+ }
+ // PING_ACK has been received then add to the list of updates
+ remote_node_t sender;
+ memset(&sender, 0, sizeof(remote_node_t));
+ sender.addr = remote->addr;
+ sender.port = remote->port;
+ update_t* new_update = (update_t *) malloc(sizeof(update_t));
+ memset(new_update, 0, sizeof(update_t));
+ new_update->remote = map_search(comm->remote_node_map, &sender, sizeof(remote_node_t));
+ if(new_update->remote == NULL) {
+ printlog(LOG_DEBUG, "SWIM: PANIC: PING_ACK received from an unknown node %s\n",inet_ntoa(*(struct in_addr *)&sender.addr));
+ }
+ new_update->count = 0;
+
+ if(msg->incarnation > new_update->remote->incarnation) {
+ new_update->remote->incarnation = msg->incarnation;
+ new_update->remote->reachability = REACHABLE;
+ new_update->remote->awol = 0;
+ new_update->remote->count_rounds = 0;
+ new_update->remote->count_awol = 0;
+ new_update->remote->count_alive = 0;
+ if( find_and_update(swim_comm->updates, new_update) == 0 ) {
+ swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+ swim_comm->count_updates++;
+ }
+ } else if(msg->incarnation == new_update->remote->incarnation) {
+ // if the node previously thought that sender was down then it prevails
+ // else if the node thought it was up then there is no change
+ }
+
+ if(confirm != 1) printlog(LOG_DEBUG,"SWIM: PING_ACK did not match entries in list\n");
+ printlog(LOG_DEBUG,"SWIM: Processed PING_ACK packet\n");
+ return;
+}
+
+/** Receiving CHECK_ACK packet*/
+static void swim_receive_checkack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+ printlog(LOG_DEBUG, "SWIM: receiving the CHECK_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+ int i;
+ for( i = 0; i < comm->remote_node_count; i++) {
+ if(comm->remote_limiters[i].addr == msg->check_target && comm->remote_limiters[i].port == msg->check_port) {
+ if(msg->checkack_value == ALIVE)
+ comm->remote_limiters[i].count_alive++;
+ else if (msg->checkack_value == AWOL)
+ comm->remote_limiters[i].count_awol++;
+ }
+ }
+ printlog(LOG_DEBUG,"SWIM: Processed CHECK_ACK packet\n");
+ return;
+}
+
+static int swim_send(comm_t *comm, int id, int sock) {
+ int i, result;
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ /**SOURCE: Send messages to friends to check on
+ * nodes which are suspected to be down*/
+ for(i = 0; i < comm->remote_node_count; i++) {
+ printlog(LOG_DEBUG, "SWIM: AWOL count of %s is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].awol);
+ if(comm->remote_limiters[i].awol == GOSSIP_REMOTE_AWOL_THRESHOLD) {
+ // HACK to make sure this part of code is entered only once
+ comm->remote_limiters[i].reachability = SUSPECT;
+ comm->remote_limiters[i].awol++;
+ help_from_friends(comm, i, id, sock);
+ }
+ }
+
+ /**SOURCE: Count number of rounds since the node has been suspected
+ * If in this process the count reaches threshold then take action
+ */
+ for (i = 0; i < comm->remote_node_count; i++) {
+ if(comm->remote_limiters[i].reachability == SUSPECT) {
+ comm->remote_limiters[i].count_rounds++;
+ printlog(LOG_DEBUG, "SWIM: ROUNDS count on %s index %d is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), i, comm->remote_limiters[i].count_rounds);
+ if(comm->remote_limiters[i].count_rounds > SOURCE_THRESHOLD) {
+ if(comm->remote_limiters[i].count_alive > 0) {
+ printlog(LOG_DEBUG,"SWIM: the node %s was up, wrongly suspected\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
+ comm->remote_limiters[i].reachability = REACHABLE;
+ comm->remote_limiters[i].count_rounds = 0;
+ comm->remote_limiters[i].count_awol = 0;
+ comm->remote_limiters[i].count_alive = 0;
+ // FIX
+ comm->remote_limiters[i].awol = 0;
+ }
+ else if (comm->remote_limiters[i].count_awol > 0) {
+ comm->remote_limiters[i].reachability = UNREACHABLE;
+ update_t* new_update = (update_t *) malloc(sizeof(update_t));
+ memset(new_update, 0, sizeof(update_t));
+ new_update->remote = &comm->remote_limiters[i];
+ new_update->count = 0;
+ // comm->remote_limiters[i].incoming.seen_seqno = 0;
+ printlog(LOG_DEBUG, "SWIM: INFECT: Update down information %s\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
+ if(find_and_update(swim_comm->updates, new_update) == 0) {
+ swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+ swim_comm->count_updates++;
+ }
+ printlog(LOG_DEBUG,"SWIM: The node %s is down. reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);// The node is check_list.target
+ }
+ else {
+ /**Even friends have not responded, request for help from more friends?*/
+ printlog(LOG_DEBUG,"SWIM: Last ditch effort, even friends did not respond\n");
+ comm->remote_limiters[i].reachability = SUSPECT;
+ comm->remote_limiters[i].count_rounds = 0; // CHECK
+ help_from_friends(comm, i, id, sock);
+ }
+ }
+ }
+ }
+
+ /**Actions performed by "FRIEND"*/
+ // DELETE THIS LOOP
+ ping_target_t* ping_list = swim_comm->ping_targets;
+ while(ping_list != NULL) {
+ printlog(LOG_DEBUG, "SWIM: in PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
+ ping_list = ping_list->next;
+ }
+ ping_list = swim_comm->ping_targets;
+ ping_target_t* ping_list_prev = swim_comm->ping_targets;
+ int delete_head = 0;
+ while(ping_list != NULL) {
+ ping_list->count++;
+ printlog(LOG_DEBUG,"SWIM: friend keeping track of gossip rounds since PING.\n");
+ // if in this process some node hits threshold then
+ // send AWOL and delete it from this list
+ if(ping_list->count >= FRIEND_THRESHOLD) {
+ printlog(LOG_DEBUG,"SWIM: friend declaring AWOL.\n");
+ message_t checkack_msg;
+ memset(&checkack_msg, 0, sizeof(message_t));
+ checkack_msg.magic = MAGIC_MSG;
+ checkack_msg.ident_id = id;
+ checkack_msg.value = 0;
+ checkack_msg.weight = 0;
+ checkack_msg.seqno = 0;
+ checkack_msg.min_seqno = 0;
+ checkack_msg.type = CHECK_ACK;
+ checkack_msg.checkack_value = AWOL;
+ // inform the source of the addr and port of suspected node
+ checkack_msg.check_target = ping_list->target.addr;
+ checkack_msg.check_port = ping_list->target.port;
+ struct sockaddr_in toaddr;
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+ toaddr.sin_addr.s_addr = ping_list->source.addr;
+ toaddr.sin_port = ping_list->source.port;
+ message_to_nbo(&checkack_msg);
+
+ if (sendto(sock, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+ result = errno;
+ printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+ }
+ // now the deletion: after deletion we want to continue from next node
+ printlog(LOG_DEBUG, "SWIM: deleting from PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
+ if(ping_list_prev == ping_list) {
+ swim_comm->ping_targets = ping_list->next;
+ ping_list->next = NULL;
+ free(ping_list);
+ ping_list = swim_comm->ping_targets;
+ delete_head = 1;
+ } else {
+ ping_list_prev->next = ping_list->next;
+ ping_list->next = NULL;
+ free(ping_list);
+ ping_list = ping_list_prev;
+ }
+ }
+ ping_list_prev = ping_list;
+ if (ping_list != NULL && delete_head != 1) ping_list = ping_list->next;
+ delete_head = 0;
+ }
+
+ return 0;
+}
+
+
+/** Handle SWIM packets received*/
+int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ if (msg->type == ACK) {
+ /* If ACK was received then reset the awol count */
+ if (msg->seqno == remote->outgoing.next_seqno - 1) {
+ /* Ack for most recent message. Clear saved state. */
+ remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+ remote->outgoing.saved_value = 0;
+ remote->outgoing.saved_weight = 0;
+
+ remote->awol = 0;
+ remote->count_awol = 0;
+ }
+ /* Ignore ack if it isn't for most recent message. */
+ } else if (msg->type == MSG) {
+ if (msg->min_seqno > remote->incoming.seen_seqno) {
+ /* Entirely new information */
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+ comm->gossip.value += msg->value;
+ comm->gossip.weight += msg->weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ remote->count_rounds = 0;
+ remote->count_awol = 0;
+ remote->count_alive = 0;
+
+ // check if there is an update piggy backed on this message
+ // if yes then add it to the update list
+ if(msg->update_present > 0) {
+ update_t *new_update = (update_t *) malloc(sizeof(update_t));
+ memset(new_update, 0, sizeof(update_t));
+ // look for the remote limiter about whom update is being
+ // sent the update node is sent in the message, msg->node!
+ remote_limiter_t *temp_remote = map_search(comm->remote_node_map, &msg->node, sizeof(remote_node_t));
+ // an update about the node itself is possible in which case map_search would fail
+ if(temp_remote != NULL) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update says %d\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr), msg->reachability);
+ new_update->remote = temp_remote;
+ new_update->count = 0;
+ if(msg->incarnation > new_update->remote->incarnation) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update is about new incarnation\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+ new_update->remote->reachability = msg->reachability;
+ new_update->remote->incarnation = msg->incarnation;
+ if(find_and_update(swim_comm->updates, new_update) == 0) {
+ swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+ swim_comm->count_updates++;
+ }
+ } else if(msg->incarnation == new_update->remote->incarnation && new_update->remote->reachability == REACHABLE && msg->reachability == UNREACHABLE) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update about same incarnation, says node unreachable\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+ new_update->remote->reachability = msg->reachability;
+ if(find_and_update(swim_comm->updates, new_update) == 0) {
+ swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+ swim_comm->count_updates++;
+ } else {
+ // Ignore the update
+ printlog(LOG_DEBUG, "SWIM: INFECT: update about %s ignored\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+ }
+ }
+ }
+ }
+ }
+ else if (msg->seqno > remote->incoming.seen_seqno) {
+ /* Only some of the message is old news. */
+ double diff_value = msg->value - remote->incoming.saved_value;
+ double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+
+ comm->gossip.value += diff_value;
+ comm->gossip.weight += diff_weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ remote->count_awol = 0;
+ }
+ else {
+ /* The entire message is old news. (Duplicate). */
+ /* Do nothing. */
+ }
+ // Hearing from a node previously declared unreachable
+ if(remote->reachability == UNREACHABLE) {
+ printlog(LOG_DEBUG, "SWIM: INFECT: seems like %s is back up\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+ remote->reachability = SUSPECT;
+ remote->awol = GOSSIP_REMOTE_AWOL_THRESHOLD;
+ remote->count_rounds = 0;
+ remote->count_awol = 0;
+ remote->count_alive = 0;
+ }
+ }
+ else if(msg->type == CHECK) {
+ swim_receive_check(comm, sock, remote, msg);
+ }
+ else if(msg->type == PING ) {
+ swim_receive_ping(comm, sock, remote, msg);
+ }
+ else if(msg->type == PING_ACK) {
+ swim_receive_pingack(comm, sock, remote, msg);
+ }
+ else if(msg->type == CHECK_ACK) {
+ swim_receive_checkack(comm, sock, remote, msg);
+ }
+ return 0;
+}
+
+int send_gossip_swim(comm_t *comm, uint32_t id, int sock) {
+ int i, j;
+ int retry_index = 0;
+ int result = 0;
+ remote_limiter_t *remote;
+ struct sockaddr_in toaddr;
+ double msg_value, msg_weight;
+
+ /* This is the factor for the portion of value/weight to keep locally.
+ * Normally this is 1, meaning that we retain the same amount of value/weight
+ * that was sent to the peers. In the case of not being able to send to a
+ * peer though, we increment this to reclaim the value/weight locally. */
+ int message_portion = 1;
+
+ memset(&toaddr, 0, sizeof(struct sockaddr_in));
+ toaddr.sin_family = AF_INET;
+
+ msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
+ msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
+
+ /*Nodes to which message was sent will have a non-zero here*/
+ /* for (i = 0; i < comm->remote_node_count; i++) {
+ if(comm->remote_limiters[i].awol > 0)
+ comm->remote_limiters[i].awol++;
+ }*/
+
+ for (i = 0; i < comm->remote_node_count; i++) {
+ printlog(LOG_DEBUG, "SWIM: STATUS: Node: %s reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);
+ }
+
+ for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+ int targetid = NULL_PEER;
+ int rand_count = 0;
+ message_t msg;
+
+ printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
+
+ /* If there are any peers with unacked messages, select them first. */
+ while (retry_index < comm->remote_node_count) {
+ if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
+ targetid = retry_index;
+ printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
+ retry_index += 1;
+ break;
+ }
+ retry_index += 1;
+ }
+
+ while (targetid == NULL_PEER && rand_count < 50) {
+ /* *Gossip node would be chosen from
+ * the array which would be randomly shuffled
+ * once all the nodes have been sent messages*
+ */
+ targetid = find_gossip_target(comm);
+ /* If we didn't find any peers the needed retransmissions, select one randomly here. */
+/* targetid = myrand() % comm->remote_node_count;
+ rand_count += 1;
+*/
+ /* Don't select an already-used index. */
+ for (j = 0; j < i; ++j) {
+ if (targetid == comm->selected[j]) {
+ printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]);
+ targetid = NULL_PEER;
+ break;
+ }
+ }
+ if (targetid != NULL_PEER) {
+ if(comm->remote_limiters[targetid].reachability != REACHABLE) {
+ printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. reachability is %d, and remote awol count is %d\n",
+ targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].awol);
+ targetid = NULL_PEER;
+ }
+ }
+
+ rand_count++;
+ }
+
+ if (targetid < 0) {
+ /* Couldn't find a suitable peer to send to... */
+ message_portion += 1;
+ printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
+ continue;
+ } else {
+ printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
+ }
+
+ remote = &comm->remote_limiters[targetid];
+ comm->selected[i] = targetid;
+
+ toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
+ toaddr.sin_port = remote->port;
+
+ memset(&msg, 0, sizeof(message_t));
+ msg.magic = MAGIC_MSG;
+ msg.ident_id = id;
+ msg.value = msg_value + remote->outgoing.saved_value;
+ msg.weight = msg_weight + remote->outgoing.saved_weight;
+ msg.seqno = remote->outgoing.next_seqno;
+ msg.min_seqno = remote->outgoing.first_seqno;
+ msg.type = MSG;
+ msg.view = comm->gossip.view;
+ if(comm->gossip.membership == SWIM) {
+ swim_comm_t *swim_comm = (swim_comm_t *)comm->membership_state;
+ // piggy back an update
+ if(swim_comm->count_updates > 0) {
+ int index = myrand() % swim_comm->count_updates;
+ update_t *pointer = swim_comm->updates;
+ update_t *prev_pointer = swim_comm->updates;
+ while(index != 0) {
+ prev_pointer = pointer;
+ pointer = pointer->next;
+ index--;
+ }
+ msg.update_present = TRUE;
+ msg.reachability = pointer->remote->reachability;
+ msg.incarnation = pointer->remote->incarnation;
+ msg.node.addr = pointer->remote->addr;
+ msg.node.port = pointer->remote->port;
+ printlog(LOG_DEBUG, "SWIM: Sending update about %s\n", inet_ntoa(*(struct in_addr *)&pointer->remote->addr));
+ pointer->count++;
+ if(swim_comm->updates != pointer && pointer->count == UPDATE_THRESHOLD) {
+ // pointer is not head
+ prev_pointer->next = pointer->next;
+ pointer->next = NULL;
+ free(pointer);
+ swim_comm->count_updates--;
+ } else if(pointer->count == UPDATE_THRESHOLD) {
+ // pointer is head of the list
+ swim_comm->updates = pointer->next;
+ pointer->next = NULL;
+ free(pointer);
+ swim_comm->count_updates--;
+ if(swim_comm->count_updates == 0) {
+ swim_comm->updates = NULL;
+ }
+ }
+ }
+ }
+ remote->outgoing.next_seqno++;
+ remote->outgoing.saved_value += msg_value;
+ remote->outgoing.saved_weight += msg_weight;
+ /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
+ remote->awol += 1;
+
+
+#ifdef ALLOW_PARTITION
+
+ if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+ printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
+ continue;
+ }
+
+#endif
+
+ message_to_nbo(&msg);
+
+ if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+ printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
+ result = errno;
+ break;
+ }
+ printlog(LOG_DEBUG,"SWIM: sent the gossip to %s\n", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr));
+ }
+ swim_send(comm, id, sock);
+ comm->gossip.value = msg_value * message_portion;
+ comm->gossip.weight = msg_weight * message_portion;
+
+ return result;
+}
+
+void swim_restart(comm_t *comm, int32_t view_number) {
+ /* Not sure about this yet... */
+}
+
+int swim_init(comm_t *comm, uint32_t id) {
+ comm->membership_state = malloc(sizeof(swim_comm_t));
+ if (comm->membership_state == NULL) {
+ return ENOMEM;
+ }
+ comm->connected = 1;
+ comm->recv_function = swim_receive;
+ comm->send_function = send_gossip_swim;
+ comm->restart_function = swim_restart;
+
+ swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+ FILE *fp = fopen("/root/incarnation", "w+");
+ fscanf(fp, "%d", &swim_comm->incarnation);
+ fprintf(fp, "%d", swim_comm->incarnation + 1); // next time a greater incarnation would be read
+ fflush(fp);
+ fclose(fp);
+
+ return 0;
+}
+
+void swim_teardown(comm_t *comm) {
+ if (comm->membership_state)
+ free(comm->membership_state);
+}
+
+/**Bhanu: new functions being introduced*/
+/** Given a friend_id and the address of the suspected node
+ * this function sends a CHECK message to friend */
+/*
+ int help_from_friend(comm_t* comm, int friend_id, in_addr_t addr, in_port_t port, uint32_t id, int sock) {
+// send a CHECK message to friend
+int result;
+message_t msg;
+memset(&msg, 0, sizeof(message_t));
+msg.magic = MAGIC_MSG;
+msg.ident_id = id;
+msg.value = 0;
+msg.weight = 0;
+msg.seqno = 0;
+msg.min_seqno = 0;
+msg.type = CHECK;
+msg.check_target = addr;
+msg.check_port = port;
+
+// send the message
+struct sockaddr_in toaddr;
+memset(&toaddr, 0, sizeof(sockaddr_in));
+toaddr.sin_family = AF_INET;
+toaddr.sin_addr.s_addr = comm->remote_limiters[friend_id].addr;
+toaddr.sin_port = comm->remote_limiters[friend_id].port;
+message_to_nbo(&msg);
+
+if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+result = errno;
+printlog(LOG_WARN, " - The error was |%d|\n", strerror(result));
+}
+}
+*/
--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+#ifndef _SWIM_H_
+#define _SWIM_H_
+
+#define AWOL (0)
+#define ALIVE (1)
+#define MAX_FRIENDS (5)
+#define UPDATE_THRESHOLD (3)
+#define FRIEND_THRESHOLD (4)
+#define SOURCE_THRESHOLD (8)
+
+static const uint16_t CHECK = 3;
+static const uint16_t CHECK_ACK = 4;
+static const uint16_t PING = 5;
+static const uint16_t PING_ACK = 6;
+
+/** Structs needed to maintain the list of nodes to which
+* ping messages have been sent. This list is maintained
+* by one of the "friend" nodes
+*/
+typedef struct ping_target {
+ // node which has been targetted
+ remote_node_t target;
+ // node which requested the ping target
+ // so that CHECK_ACK could be sent with ALIVE / AWOL
+ remote_node_t source;
+ // count of gossip rounds to keep timeout
+ uint32_t count;
+ // this has to be a list as well because we have to
+ // remember all the suspects a particular node has
+ // pinged and is waiting for a response
+ struct ping_target *next;
+} ping_target_t;
+
+typedef struct update {
+ /*Remote limiter whose update this is*/
+ remote_limiter_t *remote;
+ /*Number of times, update has been piggy
+ *backed on a gossip message*/
+ int count;
+ struct update *next;
+} update_t;
+
+typedef struct swim_comm {
+ /*Incarnation number of the node*/
+ uint32_t incarnation;
+
+ /*List of updates currently held by the node*/
+ update_t *updates;
+ int count_updates;
+
+ /** Keep track of the ping messages being sent and wait for timeout */
+ ping_target_t *ping_targets;
+} swim_comm_t;
+
+int swim_init(comm_t *comm, uint32_t id);
+
+void swim_teardown(comm_t *comm);
+
+int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
+
+int send_gossip_swim(comm_t *comm, uint32_t id, int sock);
+
+void swim_restart(comm_t *comm, int32_t view_number);
+
+#endif /* _SWIM_H_ */
.u = { .value = 0xfffffff },
};
-static config_entry_t netem_slice = {
+static config_entry_t sfq_slice = {
.next = &partition,
+ .key = "sfq_slice",
+ .type = CONFIG_TYPE_STRING,
+ .options = CONFIG_OPT_NONE,
+ .u = { .string = "NONE" },
+};
+
+static config_entry_t netem_slice = {
+ .next = &sfq_slice,
.key = "netem_slice",
.type = CONFIG_TYPE_STRING,
.options = CONFIG_OPT_NONE,
extern uint8_t system_loglevel;
extern uint8_t do_enforcement;
-/* From peer_comm.c - used to simulate partition. */
-extern int do_partition;
-extern int partition_set;
+/* Used to simulate partitions. */
+int do_partition = 0;
+int partition_set = 0xfffffff;
/* functions */
return execute_cmd(cmd);
}
+static inline int add_htb_sfq(const char *iface, const uint32_t parent_major,
+ const uint32_t parent_minor, const uint32_t handle,
+ const int perturb) {
+ char cmd[300];
+
+ sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
+ parent_minor, handle);
+ printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
+ if (execute_cmd(cmd))
+ printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n");
+
+ sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d",
+ iface, parent_major, parent_minor, handle, perturb);
+ printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
+ return execute_cmd(cmd);
+}
+
static int create_htb_hierarchy(drl_instance_t *instance) {
char cmd[300];
int i, j, k;
}
}
+ /* Turn on SFQ for experimentation. */
+ if (strcmp(sfq_slice.u.string, "NONE")) {
+ if (!strcmp(sfq_slice.u.string, "ALL")) {
+ if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30))
+ return 1;
+ if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30))
+ return 1;
+
+ for (k = 0; k < instance->leaf_count; ++k) {
+ if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid),
+ (0x1000 | instance->leaves[k].xid), 30)) {
+ return 1;
+ }
+ }
+ } else {
+ uint32_t slice_xid;
+
+ sscanf(sfq_slice.u.string, "%x", &slice_xid);
+
+ if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30))
+ return 1;
+ }
+ }
+
return 0;
}
--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+#ifdef BUILD_ZOOKEEPER
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "zk_drl.h"
+#include "logging.h"
+
+#define NULL_LEN (-1)
+#define PATH_LEN (64)
+#define PATH_BUFFER_LEN (64)
+
+static int32_t read_path_cversion(zhandle_t *zkhandle, const char *path) {
+ struct Stat stat;
+ int zoo_result;
+
+ memset(&stat, 0, sizeof(struct Stat));
+
+ zoo_result = zoo_exists(zkhandle, path, 0, &stat);
+
+ if (zoo_result != ZOK) {
+ return -1;
+ }
+
+ return stat.cversion;
+}
+
+static int process_membership_change(zhandle_t *zkhandle, zkdrlcontext_t *context, const char *path) {
+ struct String_vector children;
+ int32_t view_before = 0;
+ int32_t view_after = view_before + 1; //Needs to be != to view_before
+ int zoo_result = 0;
+ int i;
+
+ while (view_before != view_after) {
+ view_before = read_path_cversion(zkhandle, path);
+
+ zoo_result = zoo_get_children(zkhandle, path, 1, &children);
+ if (zoo_result != ZOK) {
+ return zoo_result;
+ }
+
+ view_after = read_path_cversion(zkhandle, path);
+ }
+
+ if (view_after > context->comm->gossip.view) {
+ printlog(LOG_DEBUG, "ZK:zookeeper watch says we need to restart with a new view.\n");
+ context->comm->restart_function(context->comm, view_after);
+ }
+
+ /* Clear the remote limiter list. This will be overwritten below for
+ * limiters that are found to be in the new view. */
+ for (i = 0; i < context->comm->remote_node_count; ++i) {
+ context->comm->remote_limiters[i].reachability = UNREACHABLE;
+ context->comm->remote_limiters[i].view = view_after;
+ context->comm->remote_limiters[i].view_confidence = NOTIN;
+ }
+
+ for (i = 0; i < children.count; ++i) {
+ remote_limiter_t *remote_limiter = NULL;
+ remote_node_t remote_node;
+
+ memset(&remote_node, 0, sizeof(remote_node_t));
+
+ printlog(LOG_DEBUG, "ZK:children.data[%d] is %s\n", i, children.data[i]);
+
+ sscanf(children.data[i], "%u", &remote_node.addr);
+ remote_node.port = htons(LIMITER_LISTEN_PORT);
+
+ if (remote_node.addr != context->local_addr) {
+ printlog(LOG_DEBUG, "ZK:searching map for %u:%u\n", remote_node.addr, remote_node.port);
+ remote_limiter = map_search(context->comm->remote_node_map, &remote_node, sizeof(remote_node_t));
+ assert(remote_limiter != NULL);
+ remote_limiter->reachability = REACHABLE;
+ remote_limiter->view_confidence = IN;
+ } else {
+ printlog(LOG_DEBUG, "ZK: %u is my own addr.\n", remote_node.addr);
+ }
+ }
+
+ assert(view_after >= 0);
+
+ context->comm->connected = 1;
+
+ return ZOK;
+}
+
+static void zk_connected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
+ char path[PATH_LEN];
+ char path_buffer[PATH_BUFFER_LEN];
+ int zoo_result = 0;
+
+ printlog(LOG_DEBUG, "ZK:(Re)Connected to zookeeper.\n");
+
+ sprintf(path, "/%u", context->id);
+
+ zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, PATH_BUFFER_LEN);
+
+ if (zoo_result == ZOK) {
+ printlog(LOG_DEBUG, "ZK: created path %s\n", path);
+ } else {
+ //An error occurred. It was probably already there.
+ printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
+ }
+
+ sprintf(path, "/%u/%u", context->id, context->local_addr);
+
+ zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_READ_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, PATH_BUFFER_LEN);
+
+ if (zoo_result == ZOK) {
+ printlog(LOG_DEBUG, "ZK: created path %s\n", path);
+ } else {
+ printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
+ }
+
+ sprintf(path, "/%u", context->id);
+
+ zoo_result = process_membership_change(zkhandle, context, path);
+
+ if (zoo_result != ZOK) {
+ printlog(LOG_WARN, "ZK: process_membership_change failed?\n");
+ }
+}
+
+static void zk_disconnected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
+ printlog(LOG_DEBUG, "ZK:Disconnected from zookeeper.\n");
+
+ context->comm->connected = 0;
+}
+
+static void zk_membership_change(zhandle_t *zkhandle, const char *path, zkdrlcontext_t *context) {
+ int zoo_result = 0;
+
+ printlog(LOG_DEBUG, "ZK:zookeeper child list changed.\n");
+
+ zoo_result = process_membership_change(zkhandle, context, path);
+}
+
+void zk_drl_restart(comm_t *comm, int32_t view_number) {
+ int i;
+
+ comm->gossip.value = comm->local_rate;
+ comm->gossip.weight = 1.0;
+ comm->gossip.view = view_number;
+
+ for (i = 0; i < comm->remote_node_count; ++i) {
+ if (comm->remote_limiters[i].view < view_number) {
+ comm->remote_limiters[i].rate = 0;
+ memset(&comm->remote_limiters[i].incoming, 0, sizeof(in_neighbor_t));
+ memset(&comm->remote_limiters[i].outgoing, 0, sizeof(out_neighbor_t));
+ comm->remote_limiters[i].view = view_number;
+ comm->remote_limiters[i].view_confidence = UNSURE;
+ }
+ }
+
+ printlog(LOG_DEBUG, "ZK: Changing view to %d\n", view_number);
+}
+
+static void zk_drl_watcher(zhandle_t *zkhandle, int type, int state, const char *path, void *context_ptr) {
+ zkdrlcontext_t *context = (zkdrlcontext_t *) context_ptr;
+
+ pthread_rwlock_rdlock(context->limiter_lock);
+ pthread_mutex_lock(&context->comm->lock);
+
+ if (type == ZOO_SESSION_EVENT) {
+ if (state == ZOO_CONNECTED_STATE) {
+ /* We're newly connected - set that watch! */
+ zk_connected(zkhandle, context);
+ } else if (state == ZOO_CONNECTING_STATE) {
+ /* We're no longer connected. Do something safe. */
+ zk_disconnected(zkhandle, context);
+ } else if (state == ZOO_EXPIRED_SESSION_STATE) {
+ printlog(LOG_DEBUG, "ZK:zookeeper session expired - reconnecting.\n");
+ context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
+ } else {
+ printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
+ }
+ } else if (type == ZOO_CHILD_EVENT) {
+ /* The list of child nodes in the group has changed. Re-read the
+ * group membership list and re-set the watch. */
+ zk_membership_change(zkhandle, path, context);
+ } else {
+ printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
+ }
+
+ pthread_mutex_unlock(&context->comm->lock);
+ pthread_rwlock_unlock(context->limiter_lock);
+}
+
+int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+ if (msg->type == ACK) {
+ /* If ACK was received then reset the awol count */
+ if (msg->view == comm->gossip.view && msg->seqno == remote->outgoing.next_seqno - 1) {
+ /* Ack for most recent message. Clear saved state. */
+ remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+ remote->outgoing.saved_value = 0;
+ remote->outgoing.saved_weight = 0;
+ remote->awol = 0;
+
+ } else if (msg->view > comm->gossip.view) {
+ printlog(LOG_DEBUG, "ZK:Received ack for newer view, restarting.\n");
+ comm->restart_function(comm, msg->view);
+ remote->view_confidence = IN;
+ remote->awol = 0;
+ }
+ /* Ignore ack if it isn't for most recent message or its from an old view. */
+ } else if (msg->type == MSG) {
+ if (msg->view == comm->gossip.view) {
+ if (msg->min_seqno > remote->incoming.seen_seqno) {
+ /* Entirely new information */
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+ comm->gossip.value += msg->value;
+ comm->gossip.weight += msg->weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ }
+ else if (msg->seqno > remote->incoming.seen_seqno) {
+ /* Only some of the message is old news. */
+ double diff_value = msg->value - remote->incoming.saved_value;
+ double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+
+ comm->gossip.value += diff_value;
+ comm->gossip.weight += diff_weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ }
+ else {
+ /* The entire message is old news. (Duplicate). */
+ /* Do nothing. */
+ }
+ } else if (msg->view > comm->gossip.view) {
+ printlog(LOG_DEBUG, "ZK:received message with a newer viewstamp, restarting.\n");
+ comm->restart_function(comm, msg->view);
+ remote->view_confidence = IN;
+
+ remote->incoming.seen_seqno = msg->seqno;
+ remote->incoming.saved_value = msg->value;
+ remote->incoming.saved_weight = msg->weight;
+ comm->gossip.value += msg->value;
+ comm->gossip.weight += msg->weight;
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ remote->awol = 0;
+ } else if (msg->view < comm->gossip.view) {
+ printlog(LOG_DEBUG, "ZK:received a message with an older viewstamp.\n");
+ if (remote->view_confidence == IN) {
+ /* The sender is in the new view and doesn't know it yet. */
+ send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+ } else if (remote->view_confidence == UNSURE) {
+ /* We don't know if he's in or not. */
+ send_ack(id, remote, msg->seqno, UNSUREACK, comm->gossip.view);
+ } else if (remote->view_confidence == NOTIN) {
+ /* He's out of luck... */
+ send_ack(id, remote, msg->seqno, NACK, comm->gossip.view);
+ }
+ remote->awol = 0;
+ }
+ } else if (msg->type == UNSUREACK) {
+ /* We received an ack, but the ack sender was unsure whether or not
+ * we'll be a part of its new view. Can't do much here... */
+ if (msg->view > comm->gossip.view) {
+ remote->view = msg->view;
+ remote->view_confidence = IN;
+ remote->awol = 0;
+ printlog(LOG_DEBUG, "ZK:received an UNSUREACK for view %d\n", msg->view);
+ }
+ } else if (msg->type == NACK) {
+ if (msg->view > comm->gossip.view) {
+ remote->view = msg->view;
+ remote->view_confidence = IN;
+ remote->awol = 0;
+
+ comm->connected = 0;
+ printlog(LOG_DEBUG, "ZK:received a NACK for view %d\n", msg->view);
+ }
+ }
+
+ return 0;
+}
+
+int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config) {
+ zkdrlcontext_t *context = NULL;
+ comm->connected = 0;
+
+ if ((context = malloc(sizeof(zkdrlcontext_t))) == NULL) {
+ return ENOMEM;
+ }
+
+ context->zk_host = config->zk_host;
+ context->limiter_lock = &limiter->limiter_lock;
+ context->comm = comm;
+ context->id = id;
+ context->local_addr = limiter->localaddr;
+ comm->membership_state = context;
+
+ printlog(LOG_DEBUG, "ZK: Calling zk init\n");
+
+ context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
+
+ if (context->zkhandle == NULL) {
+ printlog(LOG_CRITICAL, "ZK: docs say that this can fail, but they don't say why. :( Errno is %d\n", errno);
+ return EINVAL;
+ }
+
+ comm->recv_function = zk_drl_recv;
+ comm->send_function = send_udp_gossip;
+ comm->restart_function = zk_drl_restart;
+
+ return 0;
+}
+
+int zk_drl_close(comm_t *comm) {
+ zkdrlcontext_t *context = (zkdrlcontext_t *) comm->membership_state;
+
+ zookeeper_close(context->zkhandle);
+
+ if (context && context->zk_host) {
+ free(context->zk_host);
+ context->zk_host = NULL;
+ }
+
+ if (context) {
+ free(context);
+ comm->membership_state = NULL;
+ }
+
+ return 0;
+}
+
+#endif /* BUILD_ZOOKEEPER */
--- /dev/null
+/* See the DRL-LICENSE file for this file's software license. */
+
+#ifndef _ZK_DRL_
+#define _ZK_DRL_
+
+#define _XOPEN_SOURCE 600
+
+#include <inttypes.h>
+#include <pthread.h>
+#include <sys/types.h>
+
+#include <zookeeper.h>
+
+static const uint16_t UNSUREACK = 3;
+static const uint16_t NACK = 4;
+
+typedef struct zkdrlcontext {
+ /** The host string that should be passed to zookeeper_init when using
+ * zookeeper. This consists of comma-separated ipaddr:port pairs. Example:
+ * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" */
+ char *zk_host;
+
+ pthread_rwlock_t *limiter_lock;
+ comm_t *comm;
+ uint32_t id;
+ in_addr_t local_addr;
+
+ zhandle_t *zkhandle;
+} zkdrlcontext_t;
+
+int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config);
+
+int zk_drl_close(comm_t *comm);
+
+int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
+
+void zk_drl_restart(comm_t *comm, int32_t view_number);
+
+#endif /* _ZK_DRL_ */
--- /dev/null
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RECORDIO_H__
+#define __RECORDIO_H__
+
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct buffer {
+ int32_t len;
+ char *buff;
+};
+
+void deallocate_String(char **s);
+void deallocate_Buffer(struct buffer *b);
+void deallocate_vector(void *d);
+struct iarchive {
+ int (*start_record)(struct iarchive *ia, const char *tag);
+ int (*end_record)(struct iarchive *ia, const char *tag);
+ int (*start_vector)(struct iarchive *ia, const char *tag, int32_t *count);
+ int (*end_vector)(struct iarchive *ia, const char *tag);
+ int (*deserialize_Bool)(struct iarchive *ia, const char *name, int32_t *);
+ int (*deserialize_Int)(struct iarchive *ia, const char *name, int32_t *);
+ int (*deserialize_Long)(struct iarchive *ia, const char *name, int64_t *);
+ int (*deserialize_Buffer)(struct iarchive *ia, const char *name,
+ struct buffer *);
+ int (*deserialize_String)(struct iarchive *ia, const char *name, char **);
+ void *priv;
+};
+struct oarchive {
+ int (*start_record)(struct oarchive *oa, const char *tag);
+ int (*end_record)(struct oarchive *oa, const char *tag);
+ int (*start_vector)(struct oarchive *oa, const char *tag, const int32_t *count);
+ int (*end_vector)(struct oarchive *oa, const char *tag);
+ int (*serialize_Bool)(struct oarchive *oa, const char *name, const int32_t *);
+ int (*serialize_Int)(struct oarchive *oa, const char *name, const int32_t *);
+ int (*serialize_Long)(struct oarchive *oa, const char *name,
+ const int64_t *);
+ int (*serialize_Buffer)(struct oarchive *oa, const char *name,
+ const struct buffer *);
+ int (*serialize_String)(struct oarchive *oa, const char *name, char **);
+ void *priv;
+};
+
+struct oarchive *create_buffer_oarchive(void);
+void close_buffer_oarchive(struct oarchive **oa, int free_buffer);
+struct iarchive *create_buffer_iarchive(char *buffer, int len);
+void close_buffer_iarchive(struct iarchive **ia);
+char *get_buffer(struct oarchive *);
+int get_buffer_len(struct oarchive *);
+
+int64_t htonll(int64_t v);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
--- /dev/null
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ZOOKEEPER_H_
+#define ZOOKEEPER_H_
+
+#include <sys/time.h>
+#include <stdio.h>
+
+#include "zookeeper_version.h"
+#include "recordio.h"
+#include "zookeeper.jute.h"
+
+/**
+ * \file zookeeper.h
+ * \brief ZooKeeper functions and definitions.
+ *
+ * ZooKeeper is a network service that may be backed by a cluster of
+ * synchronized servers. The data in the service is represented as a tree
+ * of data nodes. Each node has data, children, an ACL, and status information.
+ * The data for a node is read and write in its entirety.
+ *
+ * ZooKeeper clients can leave watches when they queries the data or children
+ * of a node. If a watch is left, that client will be notified of the change.
+ * The notification is a one time trigger. Subsequent chances to the node will
+ * not trigger a notification unless the client issues a querity with the watch
+ * flag set. If the client is ever disconnected from the service, even if the
+ * disconnection is temporary, the watches of the client will be removed from
+ * the service, so a client must treat a disconnect notification as an implicit
+ * trigger of all outstanding watches.
+ *
+ * When a node is created, it may be flagged as an ephemeral node. Ephemeral
+ * nodes are automatically removed when a client session is closed or when
+ * a session times out due to inactivity (the ZooKeeper runtime fills in
+ * periods of inactivity with pings). Ephemeral nodes cannot have children.
+ *
+ * ZooKeeper clients are identified by a server assigned session id. For
+ * security reasons The server
+ * also generates a corresponding password for a session. A client may save its
+ * id and corresponding password to persistent storage in order to use the
+ * session across program invocation boundaries.
+ */
+
+/* Support for building on various platforms */
+
+// on cygwin we should take care of exporting/importing symbols properly
+#ifdef DLL_EXPORT
+# define ZOOAPI __declspec(dllexport)
+#else
+# if defined(__CYGWIN__) && !defined(USE_STATIC_LIB)
+# define ZOOAPI __declspec(dllimport)
+# else
+# define ZOOAPI
+# endif
+#endif
+
+/** zookeeper return constants **/
+
+enum ZOO_ERRORS {
+ ZOK = 0, /*!< Everything is OK */
+
+ /** System and server-side errors.
+ * This is never thrown by the server, it shouldn't be used other than
+ * to indicate a range. Specifically error codes greater than this
+ * value, but lesser than {@link #ZAPIERROR}, are system errors. */
+ ZSYSTEMERROR = -1,
+ ZRUNTIMEINCONSISTENCY = -2, /*!< A runtime inconsistency was found */
+ ZDATAINCONSISTENCY = -3, /*!< A data inconsistency was found */
+ ZCONNECTIONLOSS = -4, /*!< Connection to the server has been lost */
+ ZMARSHALLINGERROR = -5, /*!< Error while marshalling or unmarshalling data */
+ ZUNIMPLEMENTED = -6, /*!< Operation is unimplemented */
+ ZOPERATIONTIMEOUT = -7, /*!< Operation timeout */
+ ZBADARGUMENTS = -8, /*!< Invalid arguments */
+ ZINVALIDSTATE = -9, /*!< Invliad zhandle state */
+
+ /** API errors.
+ * This is never thrown by the server, it shouldn't be used other than
+ * to indicate a range. Specifically error codes greater than this
+ * value are API errors (while values less than this indicate a
+ * {@link #ZSYSTEMERROR}).
+ */
+ ZAPIERROR = -100,
+ ZNONODE = -101, /*!< Node does not exist */
+ ZNOAUTH = -102, /*!< Not authenticated */
+ ZBADVERSION = -103, /*!< Version conflict */
+ ZNOCHILDRENFOREPHEMERALS = -108, /*!< Ephemeral nodes may not have children */
+ ZNODEEXISTS = -110, /*!< The node already exists */
+ ZNOTEMPTY = -111, /*!< The node has children */
+ ZSESSIONEXPIRED = -112, /*!< The session has been expired by the server */
+ ZINVALIDCALLBACK = -113, /*!< Invalid callback specified */
+ ZINVALIDACL = -114, /*!< Invalid ACL specified */
+ ZAUTHFAILED = -115, /*!< Client authentication failed */
+ ZCLOSING = -116, /*!< ZooKeeper is closing */
+ ZNOTHING = -117, /*!< (not error) no server responses to process */
+ ZSESSIONMOVED = -118 /*!<session moved to another server, so operation is ignored */
+};
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+* @name Debug levels
+*/
+typedef enum {ZOO_LOG_LEVEL_ERROR=1,ZOO_LOG_LEVEL_WARN=2,ZOO_LOG_LEVEL_INFO=3,ZOO_LOG_LEVEL_DEBUG=4} ZooLogLevel;
+
+/**
+ * @name ACL Consts
+ */
+extern ZOOAPI const int ZOO_PERM_READ;
+extern ZOOAPI const int ZOO_PERM_WRITE;
+extern ZOOAPI const int ZOO_PERM_CREATE;
+extern ZOOAPI const int ZOO_PERM_DELETE;
+extern ZOOAPI const int ZOO_PERM_ADMIN;
+extern ZOOAPI const int ZOO_PERM_ALL;
+
+/** This Id represents anyone. */
+extern ZOOAPI struct Id ZOO_ANYONE_ID_UNSAFE;
+/** This Id is only usable to set ACLs. It will get substituted with the
+ * Id's the client authenticated with.
+ */
+extern ZOOAPI struct Id ZOO_AUTH_IDS;
+
+/** This is a completely open ACL*/
+extern ZOOAPI struct ACL_vector ZOO_OPEN_ACL_UNSAFE;
+/** This ACL gives the world the ability to read. */
+extern ZOOAPI struct ACL_vector ZOO_READ_ACL_UNSAFE;
+/** This ACL gives the creators authentication id's all permissions. */
+extern ZOOAPI struct ACL_vector ZOO_CREATOR_ALL_ACL;
+
+/**
+ * @name Interest Consts
+ * These constants are used to express interest in an event and to
+ * indicate to zookeeper which events have occurred. They can
+ * be ORed together to express multiple interests. These flags are
+ * used in the interest and event parameters of
+ * \ref zookeeper_interest and \ref zookeeper_process.
+ */
+// @{
+extern ZOOAPI const int ZOOKEEPER_WRITE;
+extern ZOOAPI const int ZOOKEEPER_READ;
+// @}
+
+/**
+ * @name Create Flags
+ *
+ * These flags are used by zoo_create to affect node create. They may
+ * be ORed together to combine effects.
+ */
+// @{
+extern ZOOAPI const int ZOO_EPHEMERAL;
+extern ZOOAPI const int ZOO_SEQUENCE;
+// @}
+
+/**
+ * @name State Consts
+ * These constants represent the states of a zookeeper connection. They are
+ * possible parameters of the watcher callback.
+ */
+// @{
+extern ZOOAPI const int ZOO_EXPIRED_SESSION_STATE;
+extern ZOOAPI const int ZOO_AUTH_FAILED_STATE;
+extern ZOOAPI const int ZOO_CONNECTING_STATE;
+extern ZOOAPI const int ZOO_ASSOCIATING_STATE;
+extern ZOOAPI const int ZOO_CONNECTED_STATE;
+// @}
+
+/**
+ * @name Watch Types
+ * These constants indicate the event that caused the watch event. They are
+ * possible values of the first parameter of the watcher callback.
+ */
+// @{
+/**
+ * \brief a node has been created.
+ *
+ * This is only generated by watches on non-existent nodes. These watches
+ * are set using \ref zoo_exists.
+ */
+extern ZOOAPI const int ZOO_CREATED_EVENT;
+/**
+ * \brief a node has been deleted.
+ *
+ * This is only generated by watches on nodes. These watches
+ * are set using \ref zoo_exists and \ref zoo_get.
+ */
+extern ZOOAPI const int ZOO_DELETED_EVENT;
+/**
+ * \brief a node has changed.
+ *
+ * This is only generated by watches on nodes. These watches
+ * are set using \ref zoo_exists and \ref zoo_get.
+ */
+extern ZOOAPI const int ZOO_CHANGED_EVENT;
+/**
+ * \brief a change as occurred in the list of children.
+ *
+ * This is only generated by watches on the child list of a node. These watches
+ * are set using \ref zoo_get_children.
+ */
+extern ZOOAPI const int ZOO_CHILD_EVENT;
+/**
+ * \brief a session has been lost.
+ *
+ * This is generated when a client loses contact or reconnects with a server.
+ */
+extern ZOOAPI const int ZOO_SESSION_EVENT;
+
+/**
+ * \brief a watch has been removed.
+ *
+ * This is generated when the server for some reason, probably a resource
+ * constraint, will no longer watch a node for a client.
+ */
+extern ZOOAPI const int ZOO_NOTWATCHING_EVENT;
+// @}
+
+/**
+ * \brief ZooKeeper handle.
+ *
+ * This is the handle that represents a connection to the ZooKeeper service.
+ * It is needed to invoke any ZooKeeper function. A handle is obtained using
+ * \ref zookeeper_init.
+ */
+typedef struct _zhandle zhandle_t;
+
+/**
+ * \brief client id structure.
+ *
+ * This structure holds the id and password for the session. This structure
+ * should be treated as opaque. It is received from the server when a session
+ * is established and needs to be sent back as-is when reconnecting a session.
+ */
+typedef struct {
+ int64_t client_id;
+ char passwd[16];
+} clientid_t;
+
+/**
+ * \brief signature of a watch function.
+ *
+ * There are two ways to receive watch notifications: legacy and watcher object.
+ * <p>
+ * The legacy style, an application wishing to receive events from ZooKeeper must
+ * first implement a function with this signature and pass a pointer to the function
+ * to \ref zookeeper_init. Next, the application sets a watch by calling one of
+ * the getter API that accept the watch integer flag (for example, \ref zoo_aexists,
+ * \ref zoo_get, etc).
+ * <p>
+ * The watcher object style uses an instance of a "watcher object" which in
+ * the C world is represented by a pair: a pointer to a function implementing this
+ * signature and a pointer to watcher context -- handback user-specific data.
+ * When a watch is triggered this function will be called along with
+ * the watcher context. An application wishing to use this style must use
+ * the getter API functions with the "w" prefix in their names (for example, \ref
+ * zoo_awexists, \ref zoo_wget, etc).
+ *
+ * \param zh zookeeper handle
+ * \param type event type. This is one of the *_EVENT constants.
+ * \param state connection state. The state value will be one of the *_STATE constants.
+ * \param path znode path for which the watcher is triggered. NULL if the event
+ * type is ZOO_SESSION_EVENT
+ * \param watcherCtx watcher context.
+ */
+typedef void (*watcher_fn)(zhandle_t *zh, int type,
+ int state, const char *path,void *watcherCtx);
+
+/**
+ * \brief create a handle to used communicate with zookeeper.
+ *
+ * This method creates a new handle and a zookeeper session that corresponds
+ * to that handle. Session establishment is asynchronous, meaning that the
+ * session should not be considered established until (and unless) an
+ * event of state ZOO_CONNECTED_STATE is received.
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ * server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \param fn the global watcher callback function. When notifications are
+ * triggered this function will be invoked.
+ * \param clientid the id of a previously established session that this
+ * client will be reconnecting to. Pass 0 if not reconnecting to a previous
+ * session. Clients can access the session id of an established, valid,
+ * connection by calling \ref zoo_client_id. If the session corresponding to
+ * the specified clientid has expired, or if the clientid is invalid for
+ * any reason, the returned zhandle_t will be invalid -- the zhandle_t
+ * state will indicate the reason for failure (typically
+ * ZOO_EXPIRED_SESSION_STATE).
+ * \param context the handback object that will be associated with this instance
+ * of zhandle_t. Application can access it (for example, in the watcher
+ * callback) using \ref zoo_get_context. The object is not used by zookeeper
+ * internally and can be null.
+ * \param flags reserved for future use. Should be set to zero.
+ * \return a pointer to the opaque zhandle structure. If it fails to create
+ * a new zhandle the function returns NULL and the errno variable
+ * indicates the reason.
+ */
+ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
+ int recv_timeout, const clientid_t *clientid, void *context, int flags);
+
+/**
+ * \brief close the zookeeper handle and free up any resources.
+ *
+ * After this call, the client session will no longer be valid. The function
+ * will flush any outstanding send requests before return. As a result it may
+ * block.
+ *
+ * This method should only be called only once on a zookeeper handle. Calling
+ * twice will cause undefined (and probably undesirable behavior). Calling any other
+ * zookeeper method after calling close is undefined behaviour and should be avoided.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \return a result code. Regardless of the error code returned, the zhandle
+ * will be destroyed and all resources freed.
+ *
+ * ZOK - success
+ * ZBADARGUMENTS - invalid input parameters
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZOPERATIONTIMEOUT - failed to flush the buffers within the specified timeout.
+ * ZCONNECTIONLOSS - a network error occured while attempting to send request to server
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ */
+ZOOAPI int zookeeper_close(zhandle_t *zh);
+
+/**
+ * \brief return the client session id, only valid if the connections
+ * is currently connected (ie. last watcher state is ZOO_CONNECTED_STATE)
+ */
+ZOOAPI const clientid_t *zoo_client_id(zhandle_t *zh);
+
+ZOOAPI int zoo_recv_timeout(zhandle_t *zh);
+
+ZOOAPI const void *zoo_get_context(zhandle_t *zh);
+
+ZOOAPI void zoo_set_context(zhandle_t *zh, void *context);
+
+/**
+ * \brief set a watcher function
+ * \return previous watcher function
+ */
+ZOOAPI watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn);
+
+#ifndef THREADED
+/**
+ * \brief Returns the events that zookeeper is interested in.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param fd is the file descriptor of interest
+ * \param interest is an or of the ZOOKEEPER_WRITE and ZOOKEEPER_READ flags to
+ * indicate the I/O of interest on fd.
+ * \param tv a timeout value to be used with select/poll system call
+ * \return a result code.
+ * ZOK - success
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZCONNECTIONLOSS - a network error occured while attempting to establish
+ * a connection to the server
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZOPERATIONTIMEOUT - hasn't received anything from the server for 2/3 of the
+ * timeout value specified in zookeeper_init()
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ */
+ZOOAPI int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
+ struct timeval *tv);
+
+/**
+ * \brief Notifies zookeeper that an event of interest has happened.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param events will be an OR of the ZOOKEEPER_WRITE and ZOOKEEPER_READ flags.
+ * \return a result code.
+ * ZOK - success
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZCONNECTIONLOSS - a network error occured while attempting to send request to server
+ * ZSESSIONEXPIRED - connection attempt failed -- the session's expired
+ * ZAUTHFAILED - authentication request failed, e.i. invalid credentials
+ * ZRUNTIMEINCONSISTENCY - a server response came out of order
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ * ZNOTHING -- not an error; simply indicates that there no more data from the server
+ * to be processed (when called with ZOOKEEPER_READ flag).
+ */
+ZOOAPI int zookeeper_process(zhandle_t *zh, int events);
+#endif
+
+/**
+ * \brief signature of a completion function for a call that returns void.
+ *
+ * This method will be invoked at the end of a asynchronous call and also as
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param data the pointer that was passed by the caller when the function
+ * that this completion corresponds to was invoked. The programmer
+ * is responsible for any memory freeing associated with the data
+ * pointer.
+ */
+typedef void (*void_completion_t)(int rc, const void *data);
+
+/**
+ * \brief signature of a completion function that returns a Stat structure.
+ *
+ * This method will be invoked at the end of a asynchronous call and also as
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param stat a pointer to the stat information for the node involved in
+ * this function. If a non zero error code is returned, the content of
+ * stat is undefined. The programmer is NOT responsible for freeing stat.
+ * \param data the pointer that was passed by the caller when the function
+ * that this completion corresponds to was invoked. The programmer
+ * is responsible for any memory freeing associated with the data
+ * pointer.
+ */
+typedef void (*stat_completion_t)(int rc, const struct Stat *stat,
+ const void *data);
+
+/**
+ * \brief signature of a completion function that returns data.
+ *
+ * This method will be invoked at the end of a asynchronous call and also as
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param value the value of the information returned by the asynchronous call.
+ * If a non zero error code is returned, the content of value is undefined.
+ * The programmer is NOT responsible for freeing value.
+ * \param value_len the number of bytes in value.
+ * \param stat a pointer to the stat information for the node involved in
+ * this function. If a non zero error code is returned, the content of
+ * stat is undefined. The programmer is NOT responsible for freeing stat.
+ * \param data the pointer that was passed by the caller when the function
+ * that this completion corresponds to was invoked. The programmer
+ * is responsible for any memory freeing associated with the data
+ * pointer.
+ */
+typedef void (*data_completion_t)(int rc, const char *value, int value_len,
+ const struct Stat *stat, const void *data);
+
+/**
+ * \brief signature of a completion function that returns a list of strings.
+ *
+ * This method will be invoked at the end of a asynchronous call and also as
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param strings a pointer to the structure containng the list of strings of the
+ * names of the children of a node. If a non zero error code is returned,
+ * the content of strings is undefined. The programmer is NOT responsible
+ * for freeing strings.
+ * \param data the pointer that was passed by the caller when the function
+ * that this completion corresponds to was invoked. The programmer
+ * is responsible for any memory freeing associated with the data
+ * pointer.
+ */
+typedef void (*strings_completion_t)(int rc,
+ const struct String_vector *strings, const void *data);
+
+/**
+ * \brief signature of a completion function that returns a list of strings.
+ *
+ * This method will be invoked at the end of a asynchronous call and also as
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param value the value of the string returned.
+ * \param data the pointer that was passed by the caller when the function
+ * that this completion corresponds to was invoked. The programmer
+ * is responsible for any memory freeing associated with the data
+ * pointer.
+ */
+typedef void
+ (*string_completion_t)(int rc, const char *value, const void *data);
+
+/**
+ * \brief signature of a completion function that returns an ACL.
+ *
+ * This method will be invoked at the end of a asynchronous call and also as
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param acl a pointer to the structure containng the ACL of a node. If a non
+ * zero error code is returned, the content of strings is undefined. The
+ * programmer is NOT responsible for freeing acl.
+ * \param stat a pointer to the stat information for the node involved in
+ * this function. If a non zero error code is returned, the content of
+ * stat is undefined. The programmer is NOT responsible for freeing stat.
+ * \param data the pointer that was passed by the caller when the function
+ * that this completion corresponds to was invoked. The programmer
+ * is responsible for any memory freeing associated with the data
+ * pointer.
+ */
+typedef void (*acl_completion_t)(int rc, struct ACL_vector *acl,
+ struct Stat *stat, const void *data);
+
+/**
+ * \brief get the state of the zookeeper connection.
+ *
+ * The return value will be one of the \ref State Consts.
+ */
+ZOOAPI int zoo_state(zhandle_t *zh);
+
+/**
+ * \brief create a node.
+ *
+ * This method will create a node in ZooKeeper. A node can only be created if
+ * it does not already exists. The Create Flags affect the creation of nodes.
+ * If ZOO_EPHEMERAL flag is set, the node will automatically get removed if the
+ * client session goes away. If the ZOO_SEQUENCE flag is set, a unique
+ * monotonically increasing sequence number is appended to the path name.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path The name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param value The data to be stored in the node.
+ * \param valuelen The number of bytes in data.
+ * \param acl The initial ACL of the node. If null, the ACL of the parent will be
+ * used.
+ * \param flags this parameter can be set to 0 for normal create or an OR
+ * of the Create Flags
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the parent node does not exist.
+ * ZNODEEXISTS the node already exists
+ * ZNOAUTH the client does not have permission.
+ * ZNOCHILDRENFOREPHEMERALS cannot create children of ephemeral nodes.
+ * \param data The data that will be passed to the completion routine when the
+ * function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_acreate(zhandle_t *zh, const char *path, const char *value,
+ int valuelen, const struct ACL_vector *acl, int flags,
+ string_completion_t completion, const void *data);
+
+/**
+ * \brief delete a node in zookeeper.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param version the expected version of the node. The function will fail if the
+ * actual version of the node does not match the expected version.
+ * If -1 is used the version check will not take place.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZNOTEMPTY children are present; node cannot be deleted.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_adelete(zhandle_t *zh, const char *path, int version,
+ void_completion_t completion, const void *data);
+
+/**
+ * \brief checks the existence of a node in zookeeper.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify the
+ * client if the node changes. The watch will be set even if the node does not
+ * exist. This allows clients to watch for nodes to appear.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when the
+ * function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aexists(zhandle_t *zh, const char *path, int watch,
+ stat_completion_t completion, const void *data);
+
+/**
+ * \brief checks the existence of a node in zookeeper.
+ *
+ * This function is similar to \ref zoo_axists except it allows one specify
+ * a watcher object - a function pointer and associated context. The function
+ * will be called once the watch has fired. The associated context data will be
+ * passed to the function as the watcher context parameter.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watcher if non-null a watch will set on the specified znode on the server.
+ * The watch will be set even if the node does not exist. This allows clients
+ * to watch for nodes to appear.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when the
+ * function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_awexists(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ stat_completion_t completion, const void *data);
+
+/**
+ * \brief gets the data associated with a node.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aget(zhandle_t *zh, const char *path, int watch,
+ data_completion_t completion, const void *data);
+
+/**
+ * \brief gets the data associated with a node.
+ *
+ * This function is similar to \ref zoo_aget except it allows one specify
+ * a watcher object rather than a boolean watch flag.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watcher if non-null, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_awget(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ data_completion_t completion, const void *data);
+
+/**
+ * \brief sets the data associated with a node.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param buffer the buffer holding data to be written to the node.
+ * \param buflen the number of bytes from buffer to write.
+ * \param version the expected version of the node. The function will fail if
+ * the actual version of the node does not match the expected version. If -1 is
+ * used the version check will not take place. * completion: If null,
+ * the function will execute synchronously. Otherwise, the function will return
+ * immediately and invoke the completion routine when the request completes.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,
+ int version, stat_completion_t completion, const void *data);
+
+/**
+ * \brief lists the children of a node.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
+ strings_completion_t completion, const void *data);
+
+/**
+ * \brief lists the children of a node.
+ *
+ * This function is similar to \ref zoo_aget_children except it allows one specify
+ * a watcher object rather than a boolean watch flag.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watcher if non-null, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_awget_children(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ strings_completion_t completion, const void *data);
+
+/**
+ * \brief Flush leader channel.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+
+ZOOAPI int zoo_async(zhandle_t *zh, const char *path,
+ string_completion_t completion, const void *data);
+
+
+/**
+ * \brief gets the acl associated with a node.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,
+ const void *data);
+
+/**
+ * \brief sets the acl associated with a node.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param buffer the buffer holding the acls to be written to the node.
+ * \param buflen the number of bytes from buffer to write.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZINVALIDACL invalid ACL specified
+ * ZBADVERSION expected version does not match actual version.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
+ struct ACL_vector *acl, void_completion_t, const void *data);
+
+/**
+ * \brief return an error string.
+ *
+ * \param return code
+ * \return string corresponding to the return code
+ */
+ZOOAPI const char* zerror(int c);
+
+/**
+ * \brief specify application credentials.
+ *
+ * The application calls this function to specify its credentials for purposes
+ * of authentication. The server will use the security provider specified by
+ * the scheme parameter to authenticate the client connection. If the
+ * authentication request has failed:
+ * - the server connection is dropped
+ * - the watcher is called with the ZOO_AUTH_FAILED_STATE value as the state
+ * parameter.
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param scheme the id of authentication scheme. Natively supported:
+ * "digest" password-based authentication
+ * \param cert application credentials. The actual value depends on the scheme.
+ * \param certLen the length of the data parameter
+ * \param completion the routine to invoke when the request completes. One of
+ * the following result codes may be passed into the completion callback:
+ * ZOK operation completed successfully
+ * ZAUTHFAILED authentication failed
+ * \param data the data that will be passed to the completion routine when the
+ * function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZSYSTEMERROR - a system error occured
+ */
+ZOOAPI int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert,
+ int certLen, void_completion_t completion, const void *data);
+
+/**
+ * \brief checks if the current zookeeper connection state can't be recovered.
+ *
+ * The application must close the zhandle and try to reconnect.
+ *
+ * \param zh the zookeeper handle (see \ref zookeeper_init)
+ * \return ZINVALIDSTATE if connection is unrecoverable
+ */
+ZOOAPI int is_unrecoverable(zhandle_t *zh);
+
+/**
+ * \brief sets the debugging level for the library
+ */
+ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel);
+
+/**
+ * \brief sets the stream to be used by the library for logging
+ *
+ * The zookeeper library uses stderr as its default log stream. Application
+ * must make sure the stream is writable. Passing in NULL resets the stream
+ * to its default value (stderr).
+ */
+ZOOAPI void zoo_set_log_stream(FILE* logStream);
+
+/**
+ * \brief enable/disable quorum endpoint order randomization
+ *
+ * If passed a non-zero value, will make the client connect to quorum peers
+ * in the order as specified in the zookeeper_init() call.
+ * A zero value causes zookeeper_init() to permute the peer endpoints
+ * which is good for more even client connection distribution among the
+ * quorum peers.
+ */
+ZOOAPI void zoo_deterministic_conn_order(int yesOrNo);
+
+/**
+ * \brief create a node synchronously.
+ *
+ * This method will create a node in ZooKeeper. A node can only be created if
+ * it does not already exists. The Create Flags affect the creation of nodes.
+ * If ZOO_EPHEMERAL flag is set, the node will automatically get removed if the
+ * client session goes away. If the ZOO_SEQUENCE flag is set, a unique
+ * monotonically increasing sequence number is appended to the path name.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path The name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param value The data to be stored in the node.
+ * \param valuelen The number of bytes in data. To set the data to be NULL use
+ * value as NULL and valuelen as -1.
+ * \param acl The initial ACL of the node. If null, the ACL of the parent will be
+ * used.
+ * \param flags this parameter can be set to 0 for normal create or an OR
+ * of the Create Flags
+ * \param path_buffer Buffer which will be filled with the path of the
+ * new node (this might be different than the supplied path
+ * because of the ZOO_SEQUENCE flag). The path string will always be
+ * null-terminated.
+ * \param path_buffer_len Size of path buffer; if the path of the new
+ * node (including space for the null terminator) exceeds the buffer size,
+ * the path string will be truncated to fit. The actual path of the
+ * new node in the server will not be affected by the truncation.
+ * The path string will always be null-terminated.
+ * \return one of the following codes are returned:
+ * ZOK operation completed succesfully
+ * ZNONODE the parent node does not exist.
+ * ZNODEEXISTS the node already exists
+ * ZNOAUTH the client does not have permission.
+ * ZNOCHILDRENFOREPHEMERALS cannot create children of ephemeral nodes.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_create(zhandle_t *zh, const char *path, const char *value,
+ int valuelen, const struct ACL_vector *acl, int flags,
+ char *path_buffer, int path_buffer_len);
+
+/**
+ * \brief delete a node in zookeeper synchronously.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param version the expected version of the node. The function will fail if the
+ * actual version of the node does not match the expected version.
+ * If -1 is used the version check will not take place.
+ * \return one of the following values is returned.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZNOTEMPTY children are present; node cannot be deleted.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_delete(zhandle_t *zh, const char *path, int version);
+
+
+/**
+ * \brief checks the existence of a node in zookeeper synchronously.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify the
+ * client if the node changes. The watch will be set even if the node does not
+ * exist. This allows clients to watch for nodes to appear.
+ * \param the return stat value of the node.
+ * \return return code of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat);
+
+/**
+ * \brief checks the existence of a node in zookeeper synchronously.
+ *
+ * This function is similar to \ref zoo_exists except it allows one specify
+ * a watcher object rather than a boolean watch flag.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watcher if non-null a watch will set on the specified znode on the server.
+ * The watch will be set even if the node does not exist. This allows clients
+ * to watch for nodes to appear.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param the return stat value of the node.
+ * \return return code of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_wexists(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx, struct Stat *stat);
+
+/**
+ * \brief gets the data associated with a node synchronously.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param buffer the buffer holding the node data returned by the server
+ * \param buffer_len is the size of the buffer pointed to by the buffer parameter.
+ * It'll be set to the actual data length upon return. If the data is NULL, length is -1.
+ * \param stat if not NULL, will hold the value of stat for the path on return.
+ * \return return value of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
+ int* buffer_len, struct Stat *stat);
+
+/**
+ * \brief gets the data associated with a node synchronously.
+ *
+ * This function is similar to \ref zoo_get except it allows one specify
+ * a watcher object rather than a boolean watch flag.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watcher if non-null, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param buffer the buffer holding the node data returned by the server
+ * \param buffer_len is the size of the buffer pointed to by the buffer parameter.
+ * It'll be set to the actual data length upon return. If the data is NULL, length is -1.
+ * \param stat if not NULL, will hold the value of stat for the path on return.
+ * \return return value of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_wget(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ char *buffer, int* buffer_len, struct Stat *stat);
+
+/**
+ * \brief sets the data associated with a node. See zoo_set2 function if
+ * you require access to the stat information associated with the znode.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param buffer the buffer holding data to be written to the node.
+ * \param buflen the number of bytes from buffer to write. To set NULL as data
+ * use buffer as NULL and buflen as -1.
+ * \param version the expected version of the node. The function will fail if
+ * the actual version of the node does not match the expected version. If -1 is
+ * used the version check will not take place.
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_set(zhandle_t *zh, const char *path, const char *buffer,
+ int buflen, int version);
+
+/**
+ * \brief sets the data associated with a node. This function is the same
+ * as zoo_set except that it also provides access to stat information
+ * associated with the znode.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param buffer the buffer holding data to be written to the node.
+ * \param buflen the number of bytes from buffer to write. To set NULL as data
+ * use buffer as NULL and buflen as -1.
+ * \param version the expected version of the node. The function will fail if
+ * the actual version of the node does not match the expected version. If -1 is
+ * used the version check will not take place.
+ * \param stat if not NULL, will hold the value of stat for the path on return.
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_set2(zhandle_t *zh, const char *path, const char *buffer,
+ int buflen, int version, struct Stat *stat);
+
+/**
+ * \brief lists the children of a node synchronously.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param strings return value of children paths.
+ * \return the return code of the function.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch,
+ struct String_vector *strings);
+
+/**
+ * \brief lists the children of a node synchronously.
+ *
+ * This function is similar to \ref zoo_get_children except it allows one specify
+ * a watcher object rather than a boolean watch flag.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param watcher if non-null, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param strings return value of children paths.
+ * \return the return code of the function.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ struct String_vector *strings);
+
+/**
+ * \brief gets the acl associated with a node synchronously.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param acl the return value of acls on the path.
+ * \param stat returns the stat of the path specified.
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl,
+ struct Stat *stat);
+
+/**
+ * \brief sets the acl associated with a node synchronously.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param version the expected version of the path.
+ * \param acl the acl to be set on the path.
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZINVALIDACL invalid ACL specified
+ * ZBADVERSION expected version does not match actual version.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_set_acl(zhandle_t *zh, const char *path, int version,
+ const struct ACL_vector *acl);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*ZOOKEEPER_H_*/
--- /dev/null
+#ifndef __ZOOKEEPER_JUTE__
+#define __ZOOKEEPER_JUTE__
+#include "recordio.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct Id {
+ char * scheme;
+ char * id;
+};
+int serialize_Id(struct oarchive *out, const char *tag, struct Id *v);
+int deserialize_Id(struct iarchive *in, const char *tag, struct Id*v);
+void deallocate_Id(struct Id*);
+struct ACL {
+ int32_t perms;
+ struct Id id;
+};
+int serialize_ACL(struct oarchive *out, const char *tag, struct ACL *v);
+int deserialize_ACL(struct iarchive *in, const char *tag, struct ACL*v);
+void deallocate_ACL(struct ACL*);
+struct Stat {
+ int64_t czxid;
+ int64_t mzxid;
+ int64_t ctime;
+ int64_t mtime;
+ int32_t version;
+ int32_t cversion;
+ int32_t aversion;
+ int64_t ephemeralOwner;
+ int32_t dataLength;
+ int32_t numChildren;
+ int64_t pzxid;
+};
+int serialize_Stat(struct oarchive *out, const char *tag, struct Stat *v);
+int deserialize_Stat(struct iarchive *in, const char *tag, struct Stat*v);
+void deallocate_Stat(struct Stat*);
+struct StatPersisted {
+ int64_t czxid;
+ int64_t mzxid;
+ int64_t ctime;
+ int64_t mtime;
+ int32_t version;
+ int32_t cversion;
+ int32_t aversion;
+ int64_t ephemeralOwner;
+ int64_t pzxid;
+};
+int serialize_StatPersisted(struct oarchive *out, const char *tag, struct StatPersisted *v);
+int deserialize_StatPersisted(struct iarchive *in, const char *tag, struct StatPersisted*v);
+void deallocate_StatPersisted(struct StatPersisted*);
+struct StatPersistedV1 {
+ int64_t czxid;
+ int64_t mzxid;
+ int64_t ctime;
+ int64_t mtime;
+ int32_t version;
+ int32_t cversion;
+ int32_t aversion;
+ int64_t ephemeralOwner;
+};
+int serialize_StatPersistedV1(struct oarchive *out, const char *tag, struct StatPersistedV1 *v);
+int deserialize_StatPersistedV1(struct iarchive *in, const char *tag, struct StatPersistedV1*v);
+void deallocate_StatPersistedV1(struct StatPersistedV1*);
+struct op_result_t {
+ int32_t rc;
+ int32_t op;
+ struct buffer response;
+};
+int serialize_op_result_t(struct oarchive *out, const char *tag, struct op_result_t *v);
+int deserialize_op_result_t(struct iarchive *in, const char *tag, struct op_result_t*v);
+void deallocate_op_result_t(struct op_result_t*);
+struct ConnectRequest {
+ int32_t protocolVersion;
+ int64_t lastZxidSeen;
+ int32_t timeOut;
+ int64_t sessionId;
+ struct buffer passwd;
+};
+int serialize_ConnectRequest(struct oarchive *out, const char *tag, struct ConnectRequest *v);
+int deserialize_ConnectRequest(struct iarchive *in, const char *tag, struct ConnectRequest*v);
+void deallocate_ConnectRequest(struct ConnectRequest*);
+struct ConnectResponse {
+ int32_t protocolVersion;
+ int32_t timeOut;
+ int64_t sessionId;
+ struct buffer passwd;
+};
+int serialize_ConnectResponse(struct oarchive *out, const char *tag, struct ConnectResponse *v);
+int deserialize_ConnectResponse(struct iarchive *in, const char *tag, struct ConnectResponse*v);
+void deallocate_ConnectResponse(struct ConnectResponse*);
+struct String_vector {
+ int32_t count;
+ char * *data;
+;
+};
+int serialize_String_vector(struct oarchive *out, const char *tag, struct String_vector *v);
+int deserialize_String_vector(struct iarchive *in, const char *tag, struct String_vector *v);
+int allocate_String_vector(struct String_vector *v, int32_t len);
+int deallocate_String_vector(struct String_vector *v);
+struct SetWatches {
+ int64_t relativeZxid;
+ struct String_vector dataWatches;
+ struct String_vector existWatches;
+ struct String_vector childWatches;
+};
+int serialize_SetWatches(struct oarchive *out, const char *tag, struct SetWatches *v);
+int deserialize_SetWatches(struct iarchive *in, const char *tag, struct SetWatches*v);
+void deallocate_SetWatches(struct SetWatches*);
+struct RequestHeader {
+ int32_t xid;
+ int32_t type;
+};
+int serialize_RequestHeader(struct oarchive *out, const char *tag, struct RequestHeader *v);
+int deserialize_RequestHeader(struct iarchive *in, const char *tag, struct RequestHeader*v);
+void deallocate_RequestHeader(struct RequestHeader*);
+struct AuthPacket {
+ int32_t type;
+ char * scheme;
+ struct buffer auth;
+};
+int serialize_AuthPacket(struct oarchive *out, const char *tag, struct AuthPacket *v);
+int deserialize_AuthPacket(struct iarchive *in, const char *tag, struct AuthPacket*v);
+void deallocate_AuthPacket(struct AuthPacket*);
+struct ReplyHeader {
+ int32_t xid;
+ int64_t zxid;
+ int32_t err;
+};
+int serialize_ReplyHeader(struct oarchive *out, const char *tag, struct ReplyHeader *v);
+int deserialize_ReplyHeader(struct iarchive *in, const char *tag, struct ReplyHeader*v);
+void deallocate_ReplyHeader(struct ReplyHeader*);
+struct GetDataRequest {
+ char * path;
+ int32_t watch;
+};
+int serialize_GetDataRequest(struct oarchive *out, const char *tag, struct GetDataRequest *v);
+int deserialize_GetDataRequest(struct iarchive *in, const char *tag, struct GetDataRequest*v);
+void deallocate_GetDataRequest(struct GetDataRequest*);
+struct SetDataRequest {
+ char * path;
+ struct buffer data;
+ int32_t version;
+};
+int serialize_SetDataRequest(struct oarchive *out, const char *tag, struct SetDataRequest *v);
+int deserialize_SetDataRequest(struct iarchive *in, const char *tag, struct SetDataRequest*v);
+void deallocate_SetDataRequest(struct SetDataRequest*);
+struct SetDataResponse {
+ struct Stat stat;
+};
+int serialize_SetDataResponse(struct oarchive *out, const char *tag, struct SetDataResponse *v);
+int deserialize_SetDataResponse(struct iarchive *in, const char *tag, struct SetDataResponse*v);
+void deallocate_SetDataResponse(struct SetDataResponse*);
+struct ACL_vector {
+ int32_t count;
+ struct ACL *data;
+;
+};
+int serialize_ACL_vector(struct oarchive *out, const char *tag, struct ACL_vector *v);
+int deserialize_ACL_vector(struct iarchive *in, const char *tag, struct ACL_vector *v);
+int allocate_ACL_vector(struct ACL_vector *v, int32_t len);
+int deallocate_ACL_vector(struct ACL_vector *v);
+struct CreateRequest {
+ char * path;
+ struct buffer data;
+ struct ACL_vector acl;
+ int32_t flags;
+};
+int serialize_CreateRequest(struct oarchive *out, const char *tag, struct CreateRequest *v);
+int deserialize_CreateRequest(struct iarchive *in, const char *tag, struct CreateRequest*v);
+void deallocate_CreateRequest(struct CreateRequest*);
+struct DeleteRequest {
+ char * path;
+ int32_t version;
+};
+int serialize_DeleteRequest(struct oarchive *out, const char *tag, struct DeleteRequest *v);
+int deserialize_DeleteRequest(struct iarchive *in, const char *tag, struct DeleteRequest*v);
+void deallocate_DeleteRequest(struct DeleteRequest*);
+struct GetChildrenRequest {
+ char * path;
+ int32_t watch;
+};
+int serialize_GetChildrenRequest(struct oarchive *out, const char *tag, struct GetChildrenRequest *v);
+int deserialize_GetChildrenRequest(struct iarchive *in, const char *tag, struct GetChildrenRequest*v);
+void deallocate_GetChildrenRequest(struct GetChildrenRequest*);
+struct GetMaxChildrenRequest {
+ char * path;
+};
+int serialize_GetMaxChildrenRequest(struct oarchive *out, const char *tag, struct GetMaxChildrenRequest *v);
+int deserialize_GetMaxChildrenRequest(struct iarchive *in, const char *tag, struct GetMaxChildrenRequest*v);
+void deallocate_GetMaxChildrenRequest(struct GetMaxChildrenRequest*);
+struct GetMaxChildrenResponse {
+ int32_t max;
+};
+int serialize_GetMaxChildrenResponse(struct oarchive *out, const char *tag, struct GetMaxChildrenResponse *v);
+int deserialize_GetMaxChildrenResponse(struct iarchive *in, const char *tag, struct GetMaxChildrenResponse*v);
+void deallocate_GetMaxChildrenResponse(struct GetMaxChildrenResponse*);
+struct SetMaxChildrenRequest {
+ char * path;
+ int32_t max;
+};
+int serialize_SetMaxChildrenRequest(struct oarchive *out, const char *tag, struct SetMaxChildrenRequest *v);
+int deserialize_SetMaxChildrenRequest(struct iarchive *in, const char *tag, struct SetMaxChildrenRequest*v);
+void deallocate_SetMaxChildrenRequest(struct SetMaxChildrenRequest*);
+struct SyncRequest {
+ char * path;
+};
+int serialize_SyncRequest(struct oarchive *out, const char *tag, struct SyncRequest *v);
+int deserialize_SyncRequest(struct iarchive *in, const char *tag, struct SyncRequest*v);
+void deallocate_SyncRequest(struct SyncRequest*);
+struct SyncResponse {
+ char * path;
+};
+int serialize_SyncResponse(struct oarchive *out, const char *tag, struct SyncResponse *v);
+int deserialize_SyncResponse(struct iarchive *in, const char *tag, struct SyncResponse*v);
+void deallocate_SyncResponse(struct SyncResponse*);
+struct GetACLRequest {
+ char * path;
+};
+int serialize_GetACLRequest(struct oarchive *out, const char *tag, struct GetACLRequest *v);
+int deserialize_GetACLRequest(struct iarchive *in, const char *tag, struct GetACLRequest*v);
+void deallocate_GetACLRequest(struct GetACLRequest*);
+struct SetACLRequest {
+ char * path;
+ struct ACL_vector acl;
+ int32_t version;
+};
+int serialize_SetACLRequest(struct oarchive *out, const char *tag, struct SetACLRequest *v);
+int deserialize_SetACLRequest(struct iarchive *in, const char *tag, struct SetACLRequest*v);
+void deallocate_SetACLRequest(struct SetACLRequest*);
+struct SetACLResponse {
+ struct Stat stat;
+};
+int serialize_SetACLResponse(struct oarchive *out, const char *tag, struct SetACLResponse *v);
+int deserialize_SetACLResponse(struct iarchive *in, const char *tag, struct SetACLResponse*v);
+void deallocate_SetACLResponse(struct SetACLResponse*);
+struct WatcherEvent {
+ int32_t type;
+ int32_t state;
+ char * path;
+};
+int serialize_WatcherEvent(struct oarchive *out, const char *tag, struct WatcherEvent *v);
+int deserialize_WatcherEvent(struct iarchive *in, const char *tag, struct WatcherEvent*v);
+void deallocate_WatcherEvent(struct WatcherEvent*);
+struct CreateResponse {
+ char * path;
+};
+int serialize_CreateResponse(struct oarchive *out, const char *tag, struct CreateResponse *v);
+int deserialize_CreateResponse(struct iarchive *in, const char *tag, struct CreateResponse*v);
+void deallocate_CreateResponse(struct CreateResponse*);
+struct ExistsRequest {
+ char * path;
+ int32_t watch;
+};
+int serialize_ExistsRequest(struct oarchive *out, const char *tag, struct ExistsRequest *v);
+int deserialize_ExistsRequest(struct iarchive *in, const char *tag, struct ExistsRequest*v);
+void deallocate_ExistsRequest(struct ExistsRequest*);
+struct ExistsResponse {
+ struct Stat stat;
+};
+int serialize_ExistsResponse(struct oarchive *out, const char *tag, struct ExistsResponse *v);
+int deserialize_ExistsResponse(struct iarchive *in, const char *tag, struct ExistsResponse*v);
+void deallocate_ExistsResponse(struct ExistsResponse*);
+struct GetDataResponse {
+ struct buffer data;
+ struct Stat stat;
+};
+int serialize_GetDataResponse(struct oarchive *out, const char *tag, struct GetDataResponse *v);
+int deserialize_GetDataResponse(struct iarchive *in, const char *tag, struct GetDataResponse*v);
+void deallocate_GetDataResponse(struct GetDataResponse*);
+struct GetChildrenResponse {
+ struct String_vector children;
+};
+int serialize_GetChildrenResponse(struct oarchive *out, const char *tag, struct GetChildrenResponse *v);
+int deserialize_GetChildrenResponse(struct iarchive *in, const char *tag, struct GetChildrenResponse*v);
+void deallocate_GetChildrenResponse(struct GetChildrenResponse*);
+struct GetACLResponse {
+ struct ACL_vector acl;
+ struct Stat stat;
+};
+int serialize_GetACLResponse(struct oarchive *out, const char *tag, struct GetACLResponse *v);
+int deserialize_GetACLResponse(struct iarchive *in, const char *tag, struct GetACLResponse*v);
+void deallocate_GetACLResponse(struct GetACLResponse*);
+struct Id_vector {
+ int32_t count;
+ struct Id *data;
+;
+};
+int serialize_Id_vector(struct oarchive *out, const char *tag, struct Id_vector *v);
+int deserialize_Id_vector(struct iarchive *in, const char *tag, struct Id_vector *v);
+int allocate_Id_vector(struct Id_vector *v, int32_t len);
+int deallocate_Id_vector(struct Id_vector *v);
+struct QuorumPacket {
+ int32_t type;
+ int64_t zxid;
+ struct buffer data;
+ struct Id_vector authinfo;
+};
+int serialize_QuorumPacket(struct oarchive *out, const char *tag, struct QuorumPacket *v);
+int deserialize_QuorumPacket(struct iarchive *in, const char *tag, struct QuorumPacket*v);
+void deallocate_QuorumPacket(struct QuorumPacket*);
+struct FileHeader {
+ int32_t magic;
+ int32_t version;
+ int64_t dbid;
+};
+int serialize_FileHeader(struct oarchive *out, const char *tag, struct FileHeader *v);
+int deserialize_FileHeader(struct iarchive *in, const char *tag, struct FileHeader*v);
+void deallocate_FileHeader(struct FileHeader*);
+struct TxnHeader {
+ int64_t clientId;
+ int32_t cxid;
+ int64_t zxid;
+ int64_t time;
+ int32_t type;
+};
+int serialize_TxnHeader(struct oarchive *out, const char *tag, struct TxnHeader *v);
+int deserialize_TxnHeader(struct iarchive *in, const char *tag, struct TxnHeader*v);
+void deallocate_TxnHeader(struct TxnHeader*);
+struct CreateTxn {
+ char * path;
+ struct buffer data;
+ struct ACL_vector acl;
+ int32_t ephemeral;
+};
+int serialize_CreateTxn(struct oarchive *out, const char *tag, struct CreateTxn *v);
+int deserialize_CreateTxn(struct iarchive *in, const char *tag, struct CreateTxn*v);
+void deallocate_CreateTxn(struct CreateTxn*);
+struct DeleteTxn {
+ char * path;
+};
+int serialize_DeleteTxn(struct oarchive *out, const char *tag, struct DeleteTxn *v);
+int deserialize_DeleteTxn(struct iarchive *in, const char *tag, struct DeleteTxn*v);
+void deallocate_DeleteTxn(struct DeleteTxn*);
+struct SetDataTxn {
+ char * path;
+ struct buffer data;
+ int32_t version;
+};
+int serialize_SetDataTxn(struct oarchive *out, const char *tag, struct SetDataTxn *v);
+int deserialize_SetDataTxn(struct iarchive *in, const char *tag, struct SetDataTxn*v);
+void deallocate_SetDataTxn(struct SetDataTxn*);
+struct SetACLTxn {
+ char * path;
+ struct ACL_vector acl;
+ int32_t version;
+};
+int serialize_SetACLTxn(struct oarchive *out, const char *tag, struct SetACLTxn *v);
+int deserialize_SetACLTxn(struct iarchive *in, const char *tag, struct SetACLTxn*v);
+void deallocate_SetACLTxn(struct SetACLTxn*);
+struct SetMaxChildrenTxn {
+ char * path;
+ int32_t max;
+};
+int serialize_SetMaxChildrenTxn(struct oarchive *out, const char *tag, struct SetMaxChildrenTxn *v);
+int deserialize_SetMaxChildrenTxn(struct iarchive *in, const char *tag, struct SetMaxChildrenTxn*v);
+void deallocate_SetMaxChildrenTxn(struct SetMaxChildrenTxn*);
+struct CreateSessionTxn {
+ int32_t timeOut;
+};
+int serialize_CreateSessionTxn(struct oarchive *out, const char *tag, struct CreateSessionTxn *v);
+int deserialize_CreateSessionTxn(struct iarchive *in, const char *tag, struct CreateSessionTxn*v);
+void deallocate_CreateSessionTxn(struct CreateSessionTxn*);
+struct ErrorTxn {
+ int32_t err;
+};
+int serialize_ErrorTxn(struct oarchive *out, const char *tag, struct ErrorTxn *v);
+int deserialize_ErrorTxn(struct iarchive *in, const char *tag, struct ErrorTxn*v);
+void deallocate_ErrorTxn(struct ErrorTxn*);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //ZOOKEEPER_JUTE__
--- /dev/null
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ZK_LOG_H_
+#define ZK_LOG_H_
+
+#include <zookeeper.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern ZOOAPI ZooLogLevel logLevel;
+#define LOGSTREAM getLogStream()
+
+#define LOG_ERROR(x) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
+ log_message(ZOO_LOG_LEVEL_ERROR,__LINE__,__func__,format_log_message x)
+#define LOG_WARN(x) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
+ log_message(ZOO_LOG_LEVEL_WARN,__LINE__,__func__,format_log_message x)
+#define LOG_INFO(x) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
+ log_message(ZOO_LOG_LEVEL_INFO,__LINE__,__func__,format_log_message x)
+#define LOG_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
+ log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)
+
+ZOOAPI void log_message(ZooLogLevel curLevel, int line,const char* funcName,
+ const char* message);
+
+ZOOAPI const char* format_log_message(const char* format,...);
+
+FILE* getLogStream();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*ZK_LOG_H_*/
--- /dev/null
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ZOOKEEPER_VERSION_H_
+#define ZOOKEEPER_VERSION_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define ZOO_MAJOR_VERSION 3
+#define ZOO_MINOR_VERSION 2
+#define ZOO_PATCH_VERSION 1
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ZOOKEEPER_VERSION_H_ */
# It will never go above this rate, even if capacity is
# available. A value of 0 means unlimited.
nodelimit=0
-
-# Leave this at FPS right now...
+# DRL allocation policy: FPS for flow proportional share
+# or GRD for global random drop.
policy=FPS
-
-# Smallest interval at which identities are processed.
+# estintms is the system tick time (estimate interval),
+# in milliseconds.
estintms=500
-
-# Log file location.
-drl_logfile="/var/log/drl.log"
-
+# The location of the DRL logfile.
+drl_logfile="/var/log/ulogd-drl.log"
# The verbosity of the DRL logfile. 1 = most verbose,
# 3 = errors only.
-drl_loglevel 2
-
-# Location of the DRL xml configuration file.
-drl_configfile="/etc/drl.xml"
-
+drl_loglevel=3
+# The location of the DRL configuration file, which
+# specifies which identities should be installed.
+drl_configfile="@etcdir@/drl.xml"
[NETFLOW]
# PlanetLab NetFlow logging
%attr(0755,root,root) %{_sbindir}/ulogd
#%attr(0755,root,root) %{_bindir}/netflow-import
%{_sysconfdir}/ulogd.conf
+%{_sysconfdir}/drl.xml
%{_sysconfdir}/logrotate.d/ulogd
%attr(0755,root,root) %{_sysconfdir}/rc.d/init.d/ulogd
%{_mandir}/man8/*