From c9d6255f0c06ee41eb2c06a5f74a957ec7be3223 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Fri, 22 Jan 2010 20:41:46 +0000 Subject: [PATCH] Updates to autotools for library detection Lots of changes to the gossip code to facilitate new group membership options Cleaned up some old code --- Makefile.in | 5 +- Rules.make.in | 6 +- configure.in | 87 +- drl.xml | 8 +- drl/Makefile.in | 6 +- drl/config.c | 64 +- drl/config.h | 16 + drl/drl_state.c | 194 ++-- drl/drl_state.h | 193 ++-- drl/estimate.c | 211 +---- drl/peer_comm.c | 651 +++++++------ drl/peer_comm.h | 26 +- drl/raterouter.h | 10 +- drl/ratetypes.h | 4 + drl/swim.c | 853 +++++++++++++++++ drl/swim.h | 67 ++ drl/ulogd_DRL.c | 57 +- drl/zk_drl.c | 346 +++++++ drl/zk_drl.h | 39 + include/zookeeper/recordio.h | 76 ++ include/zookeeper/zookeeper.h | 1249 +++++++++++++++++++++++++ include/zookeeper/zookeeper.jute.h | 376 ++++++++ include/zookeeper/zookeeper_log.h | 51 + include/zookeeper/zookeeper_version.h | 33 + ulogd.conf.in | 23 +- ulogd.spec | 1 + 26 files changed, 3993 insertions(+), 659 deletions(-) create mode 100644 drl/swim.c create mode 100644 drl/swim.h create mode 100644 drl/zk_drl.c create mode 100644 drl/zk_drl.h create mode 100644 include/zookeeper/recordio.h create mode 100644 include/zookeeper/zookeeper.h create mode 100644 include/zookeeper/zookeeper.jute.h create mode 100644 include/zookeeper/zookeeper_log.h create mode 100644 include/zookeeper/zookeeper_version.h diff --git a/Makefile.in b/Makefile.in index 618d44c..0358fd8 100644 --- a/Makefile.in +++ b/Makefile.in @@ -62,9 +62,9 @@ recurse: @for d in $(SUBDIRS); do if ! make -C $$d; then exit 1; fi; done ulogd: ulogd.c include/ulogd/ulogd.h ulogd.conf recurse - $(CC) $(CFLAGS) -rdynamic $< conffile/conffile.o $(LIBIPULOG)/libipulog.a -o $@ $(LDFLAGS) $(LIBS) `xml2-config --libs` + $(CC) $(CFLAGS) -rdynamic $< conffile/conffile.o $(LIBIPULOG)/libipulog.a -o $@ $(LDFLAGS) $(LIBS) $(XML_LDFLAGS) -edit = sed -e 's,@libdir\@,$(ULOGD_LIB_PATH),g' +edit = sed -e 's,@libdir\@,$(ULOGD_LIB_PATH),g' -e 's,@etcdir\@,$(DESTDIR)$(ETCDIR),g' ulogd.conf: ulogd.conf.in $(edit) ulogd.conf.in > ulogd.conf @@ -80,6 +80,7 @@ install: all @INSTALL@ -D -m 755 ulogd $(DESTDIR)$(BINDIR)/ulogd @[ -d $(DESTDIR)$(ETCDIR) ] || mkdir -p $(DESTDIR)$(ETCDIR) @[ -f $(DESTDIR)$(ETCDIR)/ulogd.conf ] || @INSTALL@ -D -m 600 ulogd.conf $(DESTDIR)$(ETCDIR)/ulogd.conf + @[ -f $(DESTDIR)$(ETCDIR)/drl.xml ] || @INSTALL@ -D -m 600 drl.xml $(DESTDIR)$(ETCDIR)/drl.xml doc: $(MAKE) -C $@ diff --git a/Rules.make.in b/Rules.make.in index 99a7f1a..ec9679f 100644 --- a/Rules.make.in +++ b/Rules.make.in @@ -35,7 +35,6 @@ endif LIBS=@LIBS@ - # Names of the plugins to be compiled ULOGD_SL:=BASE OPRINT PWSNIFF LOGEMU LOCAL SYSLOG @@ -54,3 +53,8 @@ PGSQL_LDFLAGS=@DATABASE_LIB_DIR@ @PGSQL_LIB@ SQLITE3_CFLAGS=-I@SQLITE3INCLUDES@ @EXTRA_SQLITE3_DEF@ SQLITE3_LDFLAGS=@DATABASE_LIB_DIR@ @SQLITE3_LIB@ +XML_CFLAGS=@XMLINCLUDES@ +XML_LDFLAGS=@XMLLIBS@ + +ZK_CFLAGS=@ZKFLAGS@ +ZK_LDFLAGS=@ZKLIBS@ diff --git a/configure.in b/configure.in index 2f888fb..7bd3640 100644 --- a/configure.in +++ b/configure.in @@ -65,10 +65,6 @@ dnl test for MySQL dnl AC_ARG_WITH(mysql, --with-mysql= mysql installed in ,[ -if test $withval == no -then - AC_MSG_WARN("mysql disabled.") -else if test $withval != yes then dir=$withval @@ -126,8 +122,7 @@ else AC_MSG_RESULT(found new MySQL) fi -fi -fi +fi ]) @@ -146,6 +141,86 @@ AC_ARG_WITH(mysql-log-ip-as-string, AC_MSG_WARN(the use of --with-mysql-log-ip-as-string is discouraged) ]) +dnl +dnl test for libxml2 +dnl +AC_ARG_WITH(libxml2, --with-libxml= libxml2 installed in ,[ +if test $withval != yes +then + dir=$withval +else + dir="/usr/local" +fi]) + +libxmldir="" +AC_MSG_CHECKING(for LIBXML2 files) +for d in $dir/bin /usr/bin /usr/local/bin /usr/local/libxml2/bin /opt/libxml2/bin /opt/packages/libxml2/bin +do + if test -x $d/xml2-config + then + AC_MSG_RESULT(found xml2-config in $d) + libxmldir=$d + break + fi +done + +if test x$libxmldir = x +then + AC_MSG_ERROR(xml2-config not found) +else + XMLINCLUDES=`$libxmldir/xml2-config --cflags` + AC_SUBST(XMLINCLUDES) + + XMLLIBS=`$libxmldir/xml2-config --libs` + AC_SUBST(XMLLIBS) +fi + +dnl +dnl check for zookeeper library +dnl +zkdir="/usr/local/lib" + +AC_ARG_WITH(zookeeper, --with-zookeeperlib= zookeeper shared object located in ,[ +if test $withval = no +then + zkdir=no + AC_MSG_WARN(Building without zookeeper support.) +else + zkdir=$withval +fi]) + +zklib="" +if test $zkdir != no +then + AC_MSG_CHECKING(for zookeeper libraries) + for d in $zkdir /usr/local/lib /lib /usr/lib + do + if test -f $d/libzookeeper_mt.so + then + AC_MSG_RESULT(found libzookeeper_mt.so in $d) + zklib=$d/libzookeeper_mt.so + break + fi + done + + if test x$zklib = x + then + dnl no zklib + AC_MSG_WARN(Zookeeper libraries not found.) + ZKLIBS="" + AC_SUBST(ZKLIBS) + + ZKFLAGS="" + AC_SUBST(ZKFLAGS) + else + dnl found it + ZKLIBS=$zklib + AC_SUBST(ZKLIBS) + + ZKFLAGS="-DTHREADED -DBUILD_ZOOKEEPER" + AC_SUBST(ZKFLAGS) + fi +fi dnl dnl test for PostgreSQL diff --git a/drl.xml b/drl.xml index 3b3f4b6..619a6ec 100644 --- a/drl.xml +++ b/drl.xml @@ -1,6 +1,10 @@ + + diff --git a/drl/Makefile.in b/drl/Makefile.in index 0315752..5eca2e2 100644 --- a/drl/Makefile.in +++ b/drl/Makefile.in @@ -1,21 +1,21 @@ # include @top_srcdir@/Rules.make -CFLAGS+=-I@top_srcdir@ -I@top_srcdir@/libipulog/include -I@top_srcdir@/include `xml2-config --cflags` +CFLAGS+=-I@top_srcdir@ -I@top_srcdir@/libipulog/include -I@top_srcdir@/include -I@top_srcdir@/include/zookeeper $(XML_CFLAGS) $(ZK_CFLAGS) SH_CFLAGS:=$(CFLAGS) -fPIC # Normally You should not need to change anything below # SHARED_LIBS=ulogd_DRL.so -OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o +OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o swim.o ulogd_DRL.o util.o zk_drl.o all: $(SHARED_LIBS) distrib: $(SHARED_LIBS): $(OBJECTS) - $(LD) $(LDFLAGS) -shared -o $@ $(OBJECTS) -lc + $(LD) $(LDFLAGS) -shared -o $@ $(OBJECTS) -lc $(ZK_LDFLAGS) %.o: %.c $(CC) $(CFLAGS) -c $< diff --git a/drl/config.c b/drl/config.c index 42f4472..d9d9ede 100644 --- a/drl/config.c +++ b/drl/config.c @@ -106,6 +106,8 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { xmlChar *limit; xmlChar *commfabric; xmlChar *branch; + xmlChar *membership; + xmlChar *failure_behavior; xmlChar *accounting; xmlChar *ewma; xmlChar *mainloop_intervals; @@ -114,6 +116,12 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { xmlNodePtr fields = ident->children; ident_peer *current = NULL; + /* The struct has been memsetted to 0, this is just to be safe. */ + common->zk_host = NULL; + common->peers = NULL; + common->members = NULL; + common->next = NULL; + /* Make sure no required fields are missing. */ id = xmlGetProp(ident, (const xmlChar *) "id"); if (id == NULL) { @@ -150,7 +158,7 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { xmlFree(commfabric); } - /* Only care about branching factor if we're using gossip. */ + /* Only care about branching factor and failure detector if we're using gossip. */ if (common->commfabric == COMM_GOSSIP) { branch = xmlGetProp(ident, (const xmlChar *) "branch"); if (branch == NULL) { @@ -160,6 +168,46 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { common->branch = atoi((const char *) branch); xmlFree(branch); } + + membership = xmlGetProp(ident, (const xmlChar *) "membership"); + if (membership == NULL) { + printlog(LOG_CRITICAL, "Ident missing membership protocol selection.\n"); + return EINVAL; + } else { + if (!xmlStrcmp(membership, (const xmlChar *) "SWIM")) { + common->membership = SWIM; + } else if (!xmlStrcmp(membership, (const xmlChar *) "ZOOKEEPER")) { +#ifdef BUILD_ZOOKEEPER + common->membership = ZOOKEEPER; +#else + printlog(LOG_CRITICAL, "Zookeeper requested, but support not compiled into DRL at configure time.\n"); + xmlFree(membership); + return EINVAL; +#endif + } else { + printlog(LOG_CRITICAL, "Unknown/invalid gossip group membership protocol.\n"); + xmlFree(membership); + return EINVAL; + } + xmlFree(membership); + } + + failure_behavior = xmlGetProp(ident, (const xmlChar *) "failure_behavior"); + if (failure_behavior == NULL) { + printlog(LOG_CRITICAL, "Ident missing failure handling behavior.\n"); + return EINVAL; + } else { + if (!xmlStrcmp(failure_behavior, (const xmlChar *) "PANIC")) { + common->failure_behavior = PANIC; + } else if (!xmlStrcmp(failure_behavior, (const xmlChar *) "QUORUM")) { + common->failure_behavior = QUORUM; + } else { + printlog(LOG_CRITICAL, "Unknown/invalid gossip failure behavior policy.\n"); + xmlFree(failure_behavior); + return EINVAL; + } + xmlFree(failure_behavior); + } } accounting = xmlGetProp(ident, (const xmlChar *) "accounting"); @@ -243,6 +291,15 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { current->next = NULL; } xmlFree(ip); + } else if ((!xmlStrcmp(fields->name, (const xmlChar *) "zkhost"))) { + xmlChar *host = xmlNodeListGetString(doc, fields->children, 1); + + common->zk_host = strdup((const char *) host); + if (common->zk_host == NULL) { + return ENOMEM; + } + + xmlFree(host); } fields = fields->next; } @@ -252,6 +309,11 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) { return EINVAL; } + if (common->membership == ZOOKEEPER && common->zk_host == NULL) { + printlog(LOG_CRITICAL, "Group membership protocol ZOOKEEPER requires a zkhost field.\n"); + return EINVAL; + } + /* No errors. */ return 0; } diff --git a/drl/config.h b/drl/config.h index 0b202c7..883b641 100644 --- a/drl/config.h +++ b/drl/config.h @@ -69,6 +69,22 @@ typedef struct ident_config { /** The gossip branch factor (when commfabric is COMM_GOSSIP). */ int branch; + /** The gossip group membership policy (SWIM, ZOOKEEPER). */ + enum memberships membership; + + /** The behavioral policy to use when one or more failures in group + * membership are detected. */ + enum failure_behaviors failure_behavior; + +#ifdef BUILD_ZOOKEEPER + + /** The host string that should be passed to zookeeper_init when using + * zookeeper. This consists of comma-separated ipaddr:port pairs. Example: + * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" */ + char *zk_host; + +#endif + /** The flow accounting mechanism to be used by this identity. */ enum accountings accounting; diff --git a/drl/drl_state.c b/drl/drl_state.c index ef57abe..d0baa4a 100644 --- a/drl/drl_state.c +++ b/drl/drl_state.c @@ -27,12 +27,61 @@ #include "ratetypes.h" #include "drl_state.h" #include "peer_comm.h" +#include "swim.h" #include "logging.h" +#ifdef BUILD_ZOOKEEPER + #include "zk_drl.h" +#endif + extern limiter_t limiter; +static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) { + switch (comm->gossip.membership) { + case SWIM: + return swim_init(comm, id); + break; + +#ifdef BUILD_ZOOKEEPER + + case ZOOKEEPER: + return zk_drl_init(comm, id, &limiter, config); + break; + +#endif + + default: + printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n"); + return EINVAL; + } +} + +static void group_membership_teardown(comm_t *comm) { + switch (comm->gossip.membership) { + case SWIM: + swim_teardown(comm); + break; + +#ifdef BUILD_ZOOKEEPER + + case ZOOKEEPER: + zk_drl_close(comm); + break; + +#endif + + default: + printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n"); + } +} + +void null_restart_function(comm_t *comm, int32_t view_number) { + /* Intentionally empty. */ +} + int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { int i; + int result = 0; memset(comm, 0, sizeof(comm_t)); @@ -40,17 +89,32 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { comm->transport_proto = UDP; comm->remote_node_count = config->peer_count; comm->gossip.gossip_branch = config->branch; + comm->gossip.membership = config->membership; + comm->gossip.failure_behavior = config->failure_behavior; comm->gossip.weight = 1.0; pthread_mutex_init(&comm->lock, NULL); - - /* Set send function. */ + + // allocate memory to the indices + comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count); + memset(comm->indices, 0, sizeof(int)*comm->remote_node_count); + for(i = 0; i < comm->remote_node_count; i++) + comm->indices[i] = i; + comm->shuffle_index = comm->remote_node_count; + + /* Set default comm function pointers. These may get overwritten later + * by more specific initialization routines such as group membership + * init calls. */ switch (config->commfabric) { case COMM_MESH: comm->send_function = send_udp_mesh; + comm->recv_function = recv_mesh; + comm->restart_function = null_restart_function; break; case COMM_GOSSIP: comm->send_function = send_udp_gossip; + comm->recv_function = recv_gossip; + comm->restart_function = null_restart_function; break; } @@ -61,8 +125,7 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { } /* Allocate remote_limiters array and fill it in. Add remotes to map. */ - comm->remote_limiters = - malloc(config->peer_count * sizeof(remote_limiter_t)); + comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t)); if (comm->remote_limiters == NULL) { pthread_mutex_destroy(&comm->lock); @@ -76,13 +139,18 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { comm->remote_limiters[i].addr = remote_nodes[i].addr; comm->remote_limiters[i].port = remote_nodes[i].port; comm->remote_limiters[i].outgoing.next_seqno = 1; + comm->remote_limiters[i].reachability = REACHABLE; + comm->remote_limiters[i].awol = 0; + comm->remote_limiters[i].count_rounds = 0; + comm->remote_limiters[i].count_awol = 0; + comm->remote_limiters[i].count_alive = 0; map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]), sizeof(remote_node_t), &comm->remote_limiters[i]); } - - /* Allocate and initialize retrys. */ - comm->retrys = malloc(config->branch * sizeof(int)); - if (comm->retrys == NULL) { + + /* Allocate and initialize selected. */ + comm->selected = malloc(config->branch * sizeof(int)); + if (comm->selected == NULL) { pthread_mutex_destroy(&comm->lock); free_map(comm->remote_node_map, 0); free(comm->remote_limiters); @@ -90,14 +158,28 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) { } for (i = 0; i < config->branch; ++i) { - comm->retrys[i] = -1; + comm->selected[i] = -1; } - return 0; + if (config->commfabric == COMM_GOSSIP) { + result = group_membership_init(comm, config->id, config); + if (result) { + pthread_mutex_destroy(&comm->lock); + free_map(comm->remote_node_map, 0); + free(comm->remote_limiters); + free(comm->selected); + } + } + + return result; } void free_comm(comm_t *comm) { if (comm) { + if (comm->comm_fabric == COMM_GOSSIP) { + group_membership_teardown(comm); + } + if (comm->remote_limiters) { free(comm->remote_limiters); } @@ -112,70 +194,76 @@ void free_comm(comm_t *comm) { pthread_mutex_destroy(&comm->lock); - if (comm->retrys) { - free(comm->retrys); + if (comm->selected) { + free(comm->selected); } } } -int read_comm(comm_t *comm, double *aggregate, double decayto) { +int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) { remote_limiter_t *remote; pthread_mutex_lock(&comm->lock); if (comm->comm_fabric == COMM_MESH) { *aggregate = 0; + *effective_global = global_limit; map_reset_iterate(comm->remote_node_map); while ((remote = map_next(comm->remote_node_map))) { - /* remote->rate corresponds to the rate (GRD) or weight (FPS) - * in generated by the peer remote. */ - *aggregate += remote->rate; - - /* If we continue to read it without having heard an update, - * we start to make the peer's value approach decayto, getting - * half of the way there each time. */ - if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) { + if (remote->reachability != REACHABLE) { printlog(LOG_WARN, "AWOL remote limiter detected.\n"); - remote->rate += ((decayto - remote->rate) / 2); + *effective_global -= (global_limit / (comm->remote_node_count + 1)); } else { - remote->awol++; + /* remote->rate corresponds to the rate (GRD) or weight (FPS) + * in generated by the peer remote. */ + *aggregate += remote->rate; } } *aggregate += comm->local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { - int i; - int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD; double value = 0; + int i; value = (comm->gossip.value / comm->gossip.weight); value *= (comm->remote_node_count + 1); - /* Keep around the last value so that we don't stupidly pick 0 when - * we're negative. If we pick 0, it looks to the limiter like it - * has free reign and it will take 100% of the rate allocation for - * itself. This is a lie. Open question what to do here... FIXME: Use decayto?*/ - if (value <= 0) { - //*aggregate = comm->gossip.last_nonzero; - *aggregate = 0; - printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n"); - } else { - *aggregate = value; - comm->gossip.last_nonzero = *aggregate; - printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value); - } - - for (i = 0; i < comm->remote_node_count; ++i) { - if (comm->remote_limiters[i].awol == threshold) { - /* Re-claim any value/weight sent. */ - comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value; - comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight; + /* Look up the failure handling policy and check to see if it is + * is currently relevant. */ + if (comm->gossip.failure_behavior == PANIC) { + int panic = 0; + if (!comm->connected) { + panic = 1; + } - comm->remote_limiters[i].outgoing.saved_value = 0.0; - comm->remote_limiters[i].outgoing.saved_weight = 0.0; + for (i = 0; i < comm->remote_node_count; ++i) { + if (comm->remote_limiters[i].reachability != REACHABLE) { + panic = 1; + } + } - comm->remote_limiters[i].awol += 1; - } else if (comm->remote_limiters[i].awol < threshold) { - comm->remote_limiters[i].awol += 1; + if (panic) { + printlog(LOG_DEBUG, "GOSSIP: Panicking!\n"); + *aggregate = comm->local_rate; + *effective_global = (global_limit / (comm->remote_node_count + 1)); + } else { + *aggregate = (value > 0) ? value : 0; + *effective_global = global_limit; + } + } else if (comm->gossip.failure_behavior == QUORUM) { + *effective_global = global_limit; + if (comm->connected) { + for (i = 0; i < comm->remote_node_count; ++i) { + if (comm->remote_limiters[i].reachability != REACHABLE) { + *effective_global -= (global_limit / (comm->remote_node_count + 1)); + } + } + *aggregate = (value > 0) ? value : 0; + } else { + /* Not part of the Quorum - do 1/n. */ + printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n"); + *aggregate = comm->local_rate; + *effective_global = (global_limit / (comm->remote_node_count + 1)); } } + printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate); } else { printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n", comm->comm_fabric); @@ -192,14 +280,11 @@ int write_local_value(comm_t *comm, const double value) { if (comm->comm_fabric == COMM_MESH) { comm->last_local_rate = comm->local_rate; comm->local_rate = value; - comm->rate_change = comm->local_rate - comm->last_local_rate; } else if (comm->comm_fabric == COMM_GOSSIP) { comm->last_local_rate = comm->local_rate; comm->local_rate = value; - comm->rate_change = comm->local_rate - comm->last_local_rate; - /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/ - /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/ - comm->gossip.value += comm->rate_change; + comm->gossip.value += (comm->local_rate - comm->last_local_rate); + printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value); } else { printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n", @@ -225,6 +310,7 @@ int send_update(comm_t *comm, uint32_t id) { } void *limiter_receive_thread(void *unused) { + printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n"); sigset_t signal_mask; sigemptyset(&signal_mask); diff --git a/drl/drl_state.h b/drl/drl_state.h index 950ea7e..4fb710e 100644 --- a/drl/drl_state.h +++ b/drl/drl_state.h @@ -30,17 +30,23 @@ #define MAX_IDENTS (1024) #define MAX_LIMITERS (128) +#define TRUE (1) +#define FALSE (0) -#define MESH_REMOTE_AWOL_THRESHOLD (5) - -//FIXME: Make this more scientific? -#define GOSSIP_REMOTE_AWOL_THRESHOLD (10 * comm->remote_node_count / comm->gossip.gossip_branch) - -enum transports { UDP, TCP }; +enum transports { UDP = 0, TCP = 1 }; +enum view_confidences { IN = 0, NOTIN = 1, UNSURE = 2 }; +enum reachabilities { REACHABLE = 0, SUSPECT = 1, UNREACHABLE = 2 }; typedef struct gossipval { + /* Fields that don't change. */ int gossip_branch; - double last_nonzero; + enum memberships membership; + enum failure_behaviors failure_behavior; + + /* Fields that change only on restart. */ + int32_t view; + + /* Fields that change frequently. */ double value; double weight; } gossip_t; @@ -63,6 +69,7 @@ typedef struct remote_node { in_port_t port; } remote_node_t; +//TODO: Clean this up typedef struct remote_limiter { /** The last known value at the remote limiter. */ double rate; @@ -71,19 +78,62 @@ typedef struct remote_limiter { out_neighbor_t outgoing; /* Socket to contact this remote limiter, if using TCP. */ - int socket; + //int socket; + /** IP address of the remote limiter, in network byte order. */ in_addr_t addr; in_port_t port; - /** Flag to keep track of situations in which we read from this node's - * value more than once before receiving an update from it. We use this - * value to know when it's safe to begin decaying the remote node's value - * (because we assume that it has failed). */ + /** Keeps track of the number of messages we have sent to this peer without + * having heard from them. */ int awol; + /** Whether or not we think this peer is reachable. */ + enum reachabilities reachability; + + /**Count of the rounds since doubt has risen and count of friends which + * suspect this node to be awol or alive*/ + int count_rounds; + int count_awol; + int count_alive; + + uint32_t incarnation; + + int32_t view; + enum view_confidences view_confidence; } remote_limiter_t; +//TODO: Reduce the size of this? +typedef struct message { + uint32_t magic; + uint32_t ident_id; + double value; + double weight; + uint32_t seqno; + uint32_t min_seqno; + uint16_t type; + + /** tell ping target the address of node which requested ping */ + in_addr_t ping_source; + in_port_t ping_port; + /** friend needs to be told the address of node suspected to be down */ + in_addr_t check_target; + in_port_t check_port; + /** friend responds with ALIVE / AWOL */ + uint32_t checkack_value; + /*Whether the message has an update piggy backed onto it*/ + uint32_t update_present; // TRUE or FALSE + /*Node is reachable or not*/ + uint32_t reachability; + /*Incarnation number of the node whose update + * is being sent piggy backed on the message*/ + uint32_t incarnation; + /*Address of the node whose update is being sent*/ + remote_node_t node; + + uint32_t view; +} message_t; + typedef struct comm { /** Communication policy. (COMM_MESH, COMM_GOSSIP) */ enum commfabrics comm_fabric; @@ -97,8 +147,6 @@ typedef struct comm { /** Previous local value. */ double last_local_rate; - double rate_change; - /** The number of remote nodes in the identity */ uint32_t remote_node_count; @@ -122,65 +170,41 @@ typedef struct comm { /** Function pointer to send function. */ int (*send_function)(struct comm *comm, uint32_t id, int sock); -#if 0 - /** Thread for handling incoming TCP data. */ - pthread_t tcp_recv_thread; - - /** Descriptor set for reading TCP messages */ - fd_set fds; -#endif + /** Function pointer to recv function for group membership. When a message + * is received, it is proccessed normally and then handed to this function + * in case additional processing is necessary for group membership. */ + int (*recv_function)(struct comm *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg); - /** Array of integers specifiying which nodes, if any, have outstanding - * unacked data. When nodes fall in this category, and it's time to send, - * these nodes will be chosen first. This only affects gossip. A - * negative number means there is no retransmit necessary. Otherwise, the - * value is the index into the remote_limiters array of the necessary - * retransmit. */ - int *retrys; + /** Function to restart the communication protocol. */ + void (*restart_function)(struct comm *comm, int32_t view_number); -} comm_t; + /** Flag indicating whether or not we are "connected" to the group + * membership service. This can only be false for membership schemes that + * require a persistent connection (Zookeeper). */ + int connected; -typedef struct message { - uint32_t magic; - uint32_t ident_id; - uint32_t seqno; - uint32_t min_seqno; - double value; - double weight; - uint16_t type; -} message_t; + /** Array of integers specifiying which nodes have been selected for + * message transmissions during the current round. */ + int *selected; -typedef struct hello_message { - uint32_t magic; - uint32_t ident_id; - uint16_t port; -} hello_t; + /** Array of indicies into remote_limiters. Used to keep a shuffled + * ordering for future gossip targets. */ + int *indices; + + /** The next index to use for target peer selection. The indicies are + * re-shuffled when this reaches remote_node_count. */ + uint32_t shuffle_index; -#if 0 -struct recv_thread_args { - comm_ident_t *ident; - pthread_rwlock_t *lock; - uint16_t port; -}; -#endif + void *membership_state; #if 0 -/** - * Initializes the global limiter. - * - * @param ipaddr The IP address on which the limiter should listen. - * INADDR_ANY will suffice. Should be specified in network byte order. - * - * @param port The port on which the limiter should listen. Should be specified - * in network byte order. - */ -void init_limiter(const in_addr_t ipaddr, const in_port_t port); + /** Thread for handling incoming TCP data. */ + pthread_t tcp_recv_thread; -/** - * Deallocates the entire global limiter. - */ -void destroy_limiter(); + /** Descriptor set for reading TCP messages */ + fd_set fds; #endif +} comm_t; /** * Fills in the communication structure of an identity. @@ -206,18 +230,9 @@ void free_comm(comm_t *comm); * Calculates and reads the current aggregate value for an identity. * This value includes the locally observed value. * - * @param comm The comm structure of the identity in question. - * - * @param aggregate The location at which the aggregate value will - * be stored. - * - * @param decayto When using a mesh comm fabric, limiters whose value - * has not been heard in several timesteps will decay to this value. - * Generally globallimit/N. - * * @returns 0 on success, EINVAL on error. */ -int read_comm(comm_t *comm, double *aggregate, double decayto); +int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit); /** * Updates the locally observed value of an identity. @@ -254,4 +269,34 @@ int send_update(comm_t *comm, uint32_t id); */ void *limiter_receive_thread(void *unused); +#if 0 +typedef struct hello_message { + uint32_t magic; + uint32_t ident_id; + uint16_t port; +} hello_t; + +struct recv_thread_args { + comm_ident_t *ident; + pthread_rwlock_t *lock; + uint16_t port; +}; + +/** + * Initializes the global limiter. + * + * @param ipaddr The IP address on which the limiter should listen. + * INADDR_ANY will suffice. Should be specified in network byte order. + * + * @param port The port on which the limiter should listen. Should be specified + * in network byte order. + */ +void init_limiter(const in_addr_t ipaddr, const in_port_t port); + +/** + * Deallocates the entire global limiter. + */ +void destroy_limiter(); +#endif + #endif /* _DRL_STATE_ */ diff --git a/drl/estimate.c b/drl/estimate.c index 3cbf95a..24adbbf 100644 --- a/drl/estimate.c +++ b/drl/estimate.c @@ -63,31 +63,14 @@ static double allocate_fps_under_limit(identity_t *ident, uint32_t target, doubl double ideal_weight; double total_weight = peer_weights + ident->last_localweight; - if (target >= ident->limit) { + if (target >= ident->effective_limit) { ideal_weight = total_weight; } else if (target <= 0) { ideal_weight = 0; // no flows here } else { - ideal_weight = ((double)target / (double)ident->limit) * total_weight; + ideal_weight = ((double)target / (double)ident->effective_limit) * total_weight; } -#if 0 - else if (peer_weights <= 0) { -#if 0 - // doesn't matter what we pick as our weight, so pick 1 / N. - ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1); -#endif - ideal_weight = ((double)target / (double)ident->limit) * total_weight; - } else { -#if 0 - double divisor = (double) ident->limit - (double) target; - ideal_weight = ((double) target * peer_weights) / divisor; -#else - ideal_weight = ((double)target / (double)ident->limit) * total_weight; -#endif - } -#endif - return ideal_weight; } @@ -180,7 +163,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight, if (ident->dampen_state == DAMPEN_TEST) { int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; - double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; + double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10; if (rate_delta > threshold) { ident->dampen_state = DAMPEN_PASSED; @@ -282,7 +265,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight, /* Convert weight value into a rate limit. If there is no measureable * weight, do a L/n allocation. */ if (total_weight > 0) { - resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight); + resulting_limit = (uint32_t) (ident->localweight * ident->effective_limit / total_weight); } else { resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); } @@ -313,7 +296,7 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight, if (ident->dampen_state_copy == DAMPEN_TEST) { int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate; - double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; + double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10; if (rate_delta > threshold) { ident->dampen_state_copy = DAMPEN_PASSED; @@ -393,7 +376,7 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight, /* Convert weight value into a rate limit. If there is no measureable * weight, do a L/n allocation. */ if (total_weight > 0) { - resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight); + resulting_limit = (uint32_t) (ident->localweight_copy * ident->effective_limit / total_weight); } else { resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1)); } @@ -404,168 +387,12 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight, #endif -/** - * Determines the amount of FPS weight to allocate to the identity during each - * estimate interval. Note that total_weight includes local weight. - */ -static uint32_t allocate_fps_old(identity_t *ident, double total_weight) { - common_accounting_t *ftable = &ident->common; /* Common flow table info */ - uint32_t local_rate = ftable->rate; - uint32_t ideallocal = 0; - double peer_weights; /* sum of weights of all other limiters */ - double idealweight = 0; - double last_portion = 0; - double this_portion = 0; - - static int dampen = 0; - int dampen_increase = 0; - - double ideal_under = 0; - double ideal_over = 0; - - int regime = 0; - - /* two cases: - 1. the aggregate is < limit - 2. the aggregate is >= limit - */ - peer_weights = total_weight - ident->last_localweight; - if (peer_weights < 0) { - peer_weights = 0; - } - - if (dampen == 1) { - int64_t rate_delta = - (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate; - double threshold = - (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10; - - if (rate_delta > threshold) { - dampen_increase = 1; - printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n", - rate_delta, threshold); - } - } - - if (local_rate <= 0) { - idealweight = 0; - } else if (dampen_increase == 0 && - (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) { - /* We're under the limit - all flows are bottlenecked. */ - idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights); - ideal_over = allocate_fps_over_limit(ident); - ideal_under = idealweight; - - if (ideal_over < idealweight) { - idealweight = ideal_over; - regime = 3; - dampen = 2; - } else { - regime = 1; - dampen = 0; - } - - /* Apply EWMA */ - ident->localweight = (ident->localweight * ident->ewma_weight + - idealweight * (1 - ident->ewma_weight)); - - } else { - idealweight = allocate_fps_over_limit(ident); - - /* Apply EWMA */ - ident->localweight = (ident->localweight * ident->ewma_weight + - idealweight * (1 - ident->ewma_weight)); - - /* This is the portion of the total weight in the system that was caused - * by this limiter in the last interval. */ - last_portion = ident->last_localweight / total_weight; - - /* This is the fraction of the total weight in the system that our - * proposed value for idealweight would use. */ - this_portion = ident->localweight / (peer_weights + ident->localweight); - - /* Dampen the large increase the first time... */ - if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) { - ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight); - dampen = 1; - } else { - dampen = 2; - } - - ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights); - ideal_over = idealweight; - - regime = 2; - } - - /* Convert weight into a rate - add in our new local weight */ - ident->total_weight = total_weight = ident->localweight + peer_weights; - - /* compute local allocation: - if there is traffic elsewhere, use the weights - otherwise do a L/n allocation */ - if (total_weight > 0) { - //if (peer_weights > 0) { - ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight); - } else { - ideallocal = ident->limit / (ident->comm.remote_node_count + 1); - } - - printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight); - - printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f Under / Over / Actual / Rate\n", - ideal_under / (ideal_under + peer_weights), - ideal_over / (ideal_over + peer_weights), - ident->localweight / (ident->localweight + peer_weights), - (double) local_rate / (double) ident->limit); - - printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over); - - if (system_loglevel == LOG_DEBUG) { - printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n", - local_rate, idealweight, ident->localweight, total_weight); - } - -#if 0 - if (printcounter <= 0) { - struct timeval tv; - double time_now; - - gettimeofday(&tv, NULL); - time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); - - printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d ", time_now, ftable->inst_rate, - idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k, - ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate, - ftable->max_flow_rate, ftable->max_flow_rate_flow_hash); - - printcounter = PRINT_COUNTER_RESET; - } else { - printcounter -= 1; - } - - //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n", - // dampen, dampen_increase, peer_weights, regime); - - if (regime == 3) { - printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n", - ideal_over, ideal_under); - } - See print_statistics() -#endif - - printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal); - - return(ideallocal); -} - /** * Determines the local drop probability for a GRD identity every estimate * interval. */ static double allocate_grd(identity_t *ident, double aggdemand) { double dropprob; - double global_limit = ident->limit; double min_dropprob = ident->drop_prob * GRD_BIG_DROP; struct timeval tv; @@ -575,8 +402,8 @@ static double allocate_grd(identity_t *ident, double aggdemand) { gettimeofday(&tv, NULL); time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000); - if (aggdemand > global_limit) { - dropprob = (aggdemand-global_limit)/aggdemand; + if (aggdemand > ident->effective_limit) { + dropprob = (aggdemand - ident->effective_limit) / aggdemand; } else { dropprob = 0.0; } @@ -615,18 +442,12 @@ static double allocate_grd(identity_t *ident, double aggdemand) { */ static void allocate(limiter_t *limiter, identity_t *ident) { /* Represents aggregate rate for GRD and aggregate weight for FPS. */ - double comm_val = 0; - - /* Read comm_val from comm layer. */ - if (limiter->policy == POLICY_FPS) { - read_comm(&ident->comm, &comm_val, - ident->total_weight / (double) (ident->comm.remote_node_count + 1)); - } else { - read_comm(&ident->comm, &comm_val, - (double) (ident->limit / (double) (ident->comm.remote_node_count + 1))); - } - printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val); + double aggregate = 0; + /* Read aggregate from comm layer. */ + read_comm(&aggregate, &ident->effective_limit, &ident->comm, ident->limit); + printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", aggregate); + /* Experimental printing. */ printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n", (double) ident->common.rate / (double) 128, ident->id); @@ -635,19 +456,19 @@ static void allocate(limiter_t *limiter, identity_t *ident) { if (limiter->policy == POLICY_FPS) { #ifdef SHADOW_ACCTING - allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID"); + allocate_fps_pretend(ident, aggregate, &ident->shadow_common, "SHADOW-ID"); ident->last_localweight_copy = ident->localweight_copy; #endif - ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID"); + ident->locallimit = allocate_fps(ident, aggregate, &ident->common, "ID"); ident->last_localweight = ident->localweight; /* Update other limiters with our weight by writing to comm layer. */ write_local_value(&ident->comm, ident->localweight); } else { ident->last_drop_prob = ident->drop_prob; - ident->drop_prob = allocate_grd(ident, comm_val); + ident->drop_prob = allocate_grd(ident, aggregate); /* Update other limiters with our rate by writing to comm layer. */ write_local_value(&ident->comm, ident->common.rate); diff --git a/drl/peer_comm.c b/drl/peer_comm.c index eba9637..fc2dc86 100644 --- a/drl/peer_comm.c +++ b/drl/peer_comm.c @@ -42,60 +42,51 @@ #include "peer_comm.h" #include "logging.h" -/* Artifically makes a network partition. */ -int do_partition = 0; -int partition_set = 0xfffffff; +#define NULL_PEER (-2) +#define MESH_REMOTE_AWOL_THRESHOLD (5) +#define GOSSIP_REMOTE_AWOL_THRESHOLD (5) -extern limiter_t limiter; +/* From ulogd_DRL.c */ +extern int do_partition; +extern int partition_set; -static const uint32_t MAGIC_MSG = 0x123123; -static const uint32_t MAGIC_HELLO = 0x456456; -static const uint16_t MSG = 1; -static const uint16_t ACK = 2; +extern limiter_t limiter; -static void message_to_hbo(message_t *msg) { +void message_to_hbo(message_t *msg) { msg->magic = ntohl(msg->magic); msg->ident_id = ntohl(msg->ident_id); + /* value is a double */ + /* weight is a double */ msg->seqno = ntohl(msg->seqno); msg->min_seqno = ntohl(msg->min_seqno); msg->type = ntohs(msg->type); - /* value is a double */ - /* weight is a double */ + /* ping_source, ping_port, check_target, and check_port stay in nbo. */ + msg->checkack_value = ntohl(msg->checkack_value); + msg->update_present = ntohl(msg->update_present); + msg->reachability = ntohl(msg->reachability); + msg->incarnation = ntohl(msg->incarnation); + /* node has two fields, both stay in nbo. */ + msg->view = ntohl(msg->view); } -static void message_to_nbo(message_t *msg) { +void message_to_nbo(message_t *msg) { msg->magic = htonl(msg->magic); msg->ident_id = htonl(msg->ident_id); + /* value is a double */ + /* weight is a double */ msg->seqno = htonl(msg->seqno); msg->min_seqno = htonl(msg->min_seqno); msg->type = htons(msg->type); - /* value is a double */ - /* weight is a double */ -} - -static void hello_to_hbo(hello_t *hello) { - hello->magic = ntohl(hello->magic); - hello->ident_id = ntohl(hello->ident_id); - hello->port = ntohs(hello->port); -} - -static void hello_to_nbo(hello_t *hello) { - hello->magic = htonl(hello->magic); - hello->ident_id = htonl(hello->ident_id); - hello->port = htons(hello->port); -} - -static int is_connected(remote_limiter_t *remote) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - - if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0) - return 1; - else - return 0; + /* ping_source, ping_port, check_target, and check_port already in nbo. */ + msg->checkack_value = htonl(msg->checkack_value); + msg->update_present = htonl(msg->update_present); + msg->reachability = htonl(msg->reachability); + msg->incarnation = htonl(msg->incarnation); + /* node has two fields, both already in nbo. */ + msg->view = htonl(msg->view); } -static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) { +int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view) { int result = 0; message_t msg; struct sockaddr_in toaddr; @@ -108,9 +99,10 @@ static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) memset(&msg, 0, sizeof(msg)); msg.magic = MAGIC_MSG; - msg.ident_id = ident->id; - msg.type = ACK; + msg.ident_id = id; + msg.type = type; msg.seqno = seqno; + msg.view = view; message_to_nbo(&msg); @@ -171,230 +163,26 @@ void limiter_receive() { return; } - switch (ident->comm.comm_fabric) { - case COMM_MESH: { - /* Use the message's value to be our new GRDrate/FPSweight for the - * message's sender. */ - remote->rate = msg.value; - - /* Reset the AWOL counter to zero since we received an update. */ - remote->awol = 0; - } - break; - - case COMM_GOSSIP: { - if (msg.type == ACK) { - if (msg.seqno == remote->outgoing.next_seqno - 1) { - int i; - - /* Ack for most recent message. Clear saved state. */ - remote->outgoing.first_seqno = remote->outgoing.next_seqno; - remote->outgoing.saved_value = 0; - remote->outgoing.saved_weight = 0; - - for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) { - if (ident->comm.retrys[i] >= 0 && - remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) { - ident->comm.retrys[i] = -2; - } - } - } - /* Ignore ack if it isn't for most recent message. */ - } else { - if (msg.min_seqno > remote->incoming.seen_seqno) { - /* Entirely new information */ - remote->incoming.seen_seqno = msg.seqno; - remote->incoming.saved_value = msg.value; - remote->incoming.saved_weight = msg.weight; - ident->comm.gossip.value += msg.value; - ident->comm.gossip.weight += msg.weight; - send_ack(ident, remote, msg.seqno); - remote->awol = 0; - } else if (msg.seqno > remote->incoming.seen_seqno) { - /* Only some of the message is old news. */ - double diff_value = msg.value - remote->incoming.saved_value; - double diff_weight = msg.weight - remote->incoming.saved_weight; - - remote->incoming.seen_seqno = msg.seqno; - remote->incoming.saved_value = msg.value; - remote->incoming.saved_weight = msg.weight; - - ident->comm.gossip.value += diff_value; - ident->comm.gossip.weight += diff_weight; - send_ack(ident, remote, msg.seqno); - remote->awol = 0; - } else { - /* The entire message is old news. (Duplicate). */ - /* Do nothing. */ - } - } - } - break; - - default: { - printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n"); - } - } - + /* Pass the message to the comm's recv function, which is responsible for + * processing its contents. */ + ident->comm.recv_function(&ident->comm, ident->id, limiter.udp_socket, remote, &msg); + pthread_mutex_unlock(&ident->comm.lock); pthread_rwlock_unlock(&limiter.limiter_lock); } -#if 0 -static void limiter_accept(comm_limiter_t *limiter) { - int sock, result; - struct sockaddr_in fromaddr; - socklen_t fromlen = sizeof(fromaddr); - remote_node_t sender; - remote_limiter_t *remote; - hello_t hello; - comm_ident_t *ident; - ident_handle *handle = NULL; - - sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen); - - assert(sock > 0); - - memset(&hello, 0, sizeof(hello_t)); - result = recv(sock, &hello, sizeof(hello_t), 0); +int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) { + /* Use the message's value to be our new GRDrate/FPSweight for the + * message's sender. */ + remote->rate = msg->value; - if (result < 0) { - close(sock); - return; /* Failure - ignore it. */ - } - - assert(result == sizeof(hello_t)); + /* Reset the AWOL counter to zero since we received an update. */ + remote->awol = 0; + remote->reachability = REACHABLE; - hello_to_hbo(&hello); - - assert(hello.magic == MAGIC_HELLO); - - memset(&sender, 0, sizeof(remote_node_t)); - sender.addr = fromaddr.sin_addr.s_addr; - sender.port = ntohs(hello.port); - - pthread_testcancel(); - - pthread_rwlock_rdlock(&limiter->rwlock); - - handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id)); - - if (handle == NULL) { - printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n"); - pthread_rwlock_unlock(&limiter->rwlock); - return; - } - - ident = limiter->identities[*handle]; - assert(ident != NULL); - - pthread_mutex_lock(&ident->lock); - - remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t)); - - if (remote == NULL) { - printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n"); - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(&limiter->rwlock); - close(sock); - return; - } - - if (is_connected(remote)) { - /* We are still connected, don't need the new socket. */ - close(sock); - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(&limiter->rwlock); - return; - } - - /* We weren't connected, but we are now... */ - remote->socket = sock; - printf("Got connection on: %d\n", sock); - FD_SET(sock, &ident->fds); - - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(&limiter->rwlock); -} - -static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) { - int result; - message_t msg; - - memset(&msg, 0, sizeof(message_t)); - - result = recv(sock, &msg, sizeof(message_t), 0); - - if (result < 0) { - pthread_rwlock_rdlock(limiter_rwlock); - pthread_mutex_lock(&ident->lock); - FD_CLR(sock, &ident->fds); - close(sock); - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(limiter_rwlock); - return; - } - - assert(result == sizeof(message_t)); - - message_to_hbo(&msg); - assert(msg.magic == MAGIC_MSG); - - pthread_rwlock_rdlock(limiter_rwlock); - pthread_mutex_lock(&ident->lock); - - switch (ident->comm_fabric) { - case COMM_GOSSIP: { - ident->gossip.value += msg.value; - ident->gossip.weight += msg.weight; - } - break; - - default: { - assert(1 == 0); /* This case shouldn't happen. Punt for now... */ - } - } - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(limiter_rwlock); + return 0; } -static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) { - int select_result, i; - fd_set fds_copy; - struct timeval timeout; - - FD_ZERO(&fds_copy); - timeout.tv_sec = 15; - timeout.tv_usec = 0; - - pthread_rwlock_rdlock(limiter_rwlock); - pthread_mutex_lock(&ident->lock); - memcpy(&fds_copy, &ident->fds, sizeof(fd_set)); - pthread_mutex_unlock(&ident->lock); - pthread_rwlock_unlock(limiter_rwlock); - - /* mask interrupt signals for this thread? */ - - select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout); - - assert(select_result >= 0); - - if (select_result == 0) - return; /* Timed out */ - - for (i = 0; (i < FD_SETSIZE) && select_result; ++i) { - if (FD_ISSET(i, &fds_copy)) { - read_tcp_message(ident, limiter_rwlock, i); - select_result--; - } - } -} -#endif - -/* Turn this on to simulate network partitions. - * Turn off for production settings. */ -//#define ALLOW_PARTITION - int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { int result = 0; remote_limiter_t *remote; @@ -417,6 +205,7 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { msg.magic = MAGIC_MSG; msg.ident_id = id; msg.value = comm->local_rate; + msg.view = comm->gossip.view; /* Do we want seqnos for mesh? We can get by without them. */ message_to_nbo(&msg); @@ -425,6 +214,14 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { for (i = 0; i < comm->remote_node_count; ++i) { remote = &comm->remote_limiters[i]; + /* Increase this counter. For mesh, it represents the number of messages we have sent to + * this remote limiter without having heard from it. This is reset to 0 when we receive + * an update from this peer. */ + remote->awol += 1; + if (remote->awol > MESH_REMOTE_AWOL_THRESHOLD) { + remote->reachability = UNREACHABLE; + } + #ifdef ALLOW_PARTITION if (do_partition) { @@ -459,10 +256,82 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) { return result; } +int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) { + if (msg->type == ACK) { + /* If ACK was received then reset the awol count */ + if (msg->seqno == remote->outgoing.next_seqno - 1) { + /* Ack for most recent message. Clear saved state. */ + remote->outgoing.first_seqno = remote->outgoing.next_seqno; + remote->outgoing.saved_value = 0; + remote->outgoing.saved_weight = 0; + + remote->awol = 0; + } + /* Ignore ack if it isn't for most recent message. */ + } else if (msg->type == MSG) { + if (msg->min_seqno > remote->incoming.seen_seqno) { + /* Entirely new information */ + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + comm->gossip.value += msg->value; + comm->gossip.weight += msg->weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + } + else if (msg->seqno > remote->incoming.seen_seqno) { + /* Only some of the message is old news. */ + double diff_value = msg->value - remote->incoming.saved_value; + double diff_weight = msg->weight - remote->incoming.saved_weight; + + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + + comm->gossip.value += diff_value; + comm->gossip.weight += diff_weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + } + else { + /* The entire message is old news. (Duplicate). */ + /* Do nothing. */ + } + } + + return 0; +} + +int find_gossip_target(comm_t *comm) { + int target = NULL_PEER; + int k; + + if (comm->shuffle_index < comm->remote_node_count) { + target = comm->indices[comm->shuffle_index]; + printlog(LOG_DEBUG,"GOSSIP: found index %d.\n", target); + comm->shuffle_index++; + } + else { + // shuffle the remote_limiters array + printlog(LOG_DEBUG, "GOSSIP: shuffling the array.\n"); + for ( k = 0; k < comm->remote_node_count; k++) { + uint32_t l = myrand() % comm->remote_node_count; + int t; + t = comm->indices[l]; + comm->indices[l] = comm->indices[k]; + comm->indices[k] = t; + } + comm->shuffle_index = 0; + target = comm->indices[comm->shuffle_index]; + printlog(LOG_DEBUG,"GOSSIP: found index after spilling over %d.\n", target); + comm->shuffle_index++; + } + return target; +} + int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { - int i, j, targetid; - int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD; - int rand_count; //HACK... + int i, j; + int retry_index = 0; int result = 0; remote_limiter_t *remote; struct sockaddr_in toaddr; @@ -473,7 +342,7 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { * that was sent to the peers. In the case of not being able to send to a * peer though, we increment this to reclaim the value/weight locally. */ int message_portion = 1; - + memset(&toaddr, 0, sizeof(struct sockaddr_in)); toaddr.sin_family = AF_INET; @@ -481,51 +350,63 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1); for (i = 0; i < comm->gossip.gossip_branch; ++i) { + int targetid = NULL_PEER; + int rand_count = 0; message_t msg; printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch); - if (comm->retrys[i] >= 0) { - remote = &comm->remote_limiters[comm->retrys[i]]; - targetid = comm->retrys[i]; - - if (remote->awol > awol_threshold) { - message_portion += 1; - printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]); - comm->retrys[i] = -1; - continue; + /* If there are any peers with unacked messages, select them first. */ + while (retry_index < comm->remote_node_count) { + if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) { + targetid = retry_index; + printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid); } - } else { - targetid = -2; - rand_count = 0; - - while (targetid == -2 && rand_count < 50) { - targetid = myrand() % comm->remote_node_count; - rand_count += 1; - - /* Don't select an already-used index. */ - for (j = 0; j < comm->gossip.gossip_branch; ++j) { - if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) { - printlog(LOG_DEBUG, "Gossip: disqualified targetid %d. retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol); - targetid = -2; - break; - } + + retry_index += 1; + } + + while (targetid == NULL_PEER && rand_count < 10) { + /* Select a recipient from a randomly-shuffled array. */ + targetid = find_gossip_target(comm); + + assert(targetid != NULL_PEER); + + /* Don't select an already-used index. */ + for (j = 0; j < i; ++j) { + if (targetid == comm->selected[j]) { + printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]); + targetid = NULL_PEER; + break; } } - if (targetid < 0) { - /* Couldn't find a suitable peer to send to... */ - message_portion += 1; - printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n"); - continue; - } else { - printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid); + /* Don't select an unreachable peer or one that is not in our view. */ + if (targetid != NULL_PEER) { + if (comm->remote_limiters[targetid].reachability != REACHABLE || + comm->remote_limiters[targetid].view != comm->gossip.view || + comm->remote_limiters[targetid].view_confidence != IN) { + printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d, reachability is %d, remote view is %d (confidence:%d), my view is %d\n", + targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].view, + comm->remote_limiters[targetid].view_confidence, comm->gossip.view); + targetid = NULL_PEER; + } } - remote = &comm->remote_limiters[targetid]; + rand_count++; } - - comm->retrys[i] = targetid; + + if (targetid == NULL_PEER) { + /* Couldn't find a suitable peer to send to... */ + message_portion += 1; + printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n"); + continue; + } else { + printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid); + } + + remote = &comm->remote_limiters[targetid]; + comm->selected[i] = targetid; toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ toaddr.sin_port = remote->port; @@ -538,15 +419,19 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { msg.seqno = remote->outgoing.next_seqno; msg.min_seqno = remote->outgoing.first_seqno; msg.type = MSG; - + msg.view = comm->gossip.view; + remote->outgoing.next_seqno++; remote->outgoing.saved_value += msg_value; remote->outgoing.saved_weight += msg_weight; + /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */ + remote->awol += 1; + #ifdef ALLOW_PARTITION if (do_partition && ((partition_set & (1 << targetid)) == 0)) { - printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid); + printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid); continue; } @@ -560,14 +445,37 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) { break; } } - comm->gossip.value = msg_value * message_portion; comm->gossip.weight = msg_weight * message_portion; return result; } + +/* Old TCP code. */ #if 0 +static void hello_to_hbo(hello_t *hello) { + hello->magic = ntohl(hello->magic); + hello->ident_id = ntohl(hello->ident_id); + hello->port = ntohs(hello->port); +} + +static void hello_to_nbo(hello_t *hello) { + hello->magic = htonl(hello->magic); + hello->ident_id = htonl(hello->ident_id); + hello->port = htons(hello->port); +} + +static int is_connected(remote_limiter_t *remote) { + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0) + return 1; + else + return 0; +} + int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) { int i, targetid, sock; int result = 0; @@ -601,9 +509,7 @@ int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) { return result; } -#endif -#if 0 void *limiter_accept_thread(void *limiter) { sigset_t signal_mask; @@ -686,4 +592,153 @@ void *ident_receive_thread(void *recv_args) { } pthread_exit(NULL); } + +static void limiter_accept(comm_limiter_t *limiter) { + int sock, result; + struct sockaddr_in fromaddr; + socklen_t fromlen = sizeof(fromaddr); + remote_node_t sender; + remote_limiter_t *remote; + hello_t hello; + comm_ident_t *ident; + dent_handle *handle = NULL; + + sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen); + + assert(sock > 0); + + memset(&hello, 0, sizeof(hello_t)); + result = recv(sock, &hello, sizeof(hello_t), 0); + + if (result < 0) { + close(sock); + return; /* Failure - ignore it. */ + } + + assert(result == sizeof(hello_t)); + + hello_to_hbo(&hello); + + assert(hello.magic == MAGIC_HELLO); + + memset(&sender, 0, sizeof(remote_node_t)); + sender.addr = fromaddr.sin_addr.s_addr; + sender.port = ntohs(hello.port); + + pthread_testcancel(); + + pthread_rwlock_rdlock(&limiter->rwlock); + + handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id)); + + if (handle == NULL) { + printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n"); + pthread_rwlock_unlock(&limiter->rwlock); + return; + } + + ident = limiter->identities[*handle]; + assert(ident != NULL); + + pthread_mutex_lock(&ident->lock); + + remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t)); + + if (remote == NULL) { + printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n"); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); + close(sock); + return; + } + + if (is_connected(remote)) { + /* We are still connected, don't need the new socket. */ + close(sock); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); + return; + } + + /* We weren't connected, but we are now... */ + remote->socket = sock; + printf("Got connection on: %d\n", sock); + FD_SET(sock, &ident->fds); + + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(&limiter->rwlock); +} + +static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) { + int result; + message_t msg; + + memset(&msg, 0, sizeof(message_t)); + + result = recv(sock, &msg, sizeof(message_t), 0); + + if (result < 0) { + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + FD_CLR(sock, &ident->fds); + close(sock); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); + return; + } + + assert(result == sizeof(message_t)); + + message_to_hbo(&msg); + assert(msg.magic == MAGIC_MSG); + + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + + switch (ident->comm_fabric) { + case COMM_GOSSIP: { + ident->gossip.value += msg.value; + ident->gossip.weight += msg.weight; + } + break; + + default: { + assert(1 == 0); /* This case shouldn't happen. Punt for now... */ + } + } + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); +} + +static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) { + int select_result, i; + fd_set fds_copy; + struct timeval timeout; + + FD_ZERO(&fds_copy); + timeout.tv_sec = 15; + timeout.tv_usec = 0; + + pthread_rwlock_rdlock(limiter_rwlock); + pthread_mutex_lock(&ident->lock); + memcpy(&fds_copy, &ident->fds, sizeof(fd_set)); + pthread_mutex_unlock(&ident->lock); + pthread_rwlock_unlock(limiter_rwlock); + + /* mask interrupt signals for this thread? */ + + select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout); + + assert(select_result >= 0); + + if (select_result == 0) + return; /* Timed out */ + + for (i = 0; (i < FD_SETSIZE) && select_result; ++i) { + if (FD_ISSET(i, &fds_copy)) { + read_tcp_message(ident, limiter_rwlock, i); + select_result--; + } + } +} #endif diff --git a/drl/peer_comm.h b/drl/peer_comm.h index c8fd2e1..0e28386 100644 --- a/drl/peer_comm.h +++ b/drl/peer_comm.h @@ -3,18 +3,36 @@ #ifndef _PEER_COMM_H_ #define _PEER_COMM_H_ +#define NULL_PEER (-2) +#define MESH_REMOTE_AWOL_THRESHOLD (5) +#define GOSSIP_REMOTE_AWOL_THRESHOLD (5) + +static const uint32_t MAGIC_MSG = 0x123123; +static const uint32_t MAGIC_HELLO = 0x456456; +static const uint16_t MSG = 1; +static const uint16_t ACK = 2; + void limiter_receive(); -#if 0 -void *limiter_accept_thread(void *limiter); +void message_to_hbo(message_t *msg); -void *ident_receive_thread(void *limiter); -#endif +void message_to_nbo(message_t *msg); int send_udp_mesh(comm_t *comm, uint32_t id, int sock); +int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg); + int send_udp_gossip(comm_t *comm, uint32_t id, int sock); +int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg); + +int find_gossip_target(comm_t *comm); + +int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view); #if 0 +void *limiter_accept_thread(void *limiter); + +void *ident_receive_thread(void *limiter); + int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused); #endif diff --git a/drl/raterouter.h b/drl/raterouter.h index 37f0141..5e89e89 100644 --- a/drl/raterouter.h +++ b/drl/raterouter.h @@ -34,8 +34,10 @@ enum policies { POLICY_GRD = 1, POLICY_FPS = 2 }; enum commfabrics { COMM_MESH = 1, COMM_GOSSIP = 2 }; -enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4}; -enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4}; +enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4 }; +enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4 }; +enum memberships { SWIM = 1, ZOOKEEPER = 2 }; +enum failure_behaviors { PANIC = 1, QUORUM = 2 }; /* The comm library also has definitions for comfabrics. This prevents us * from defining them twice. */ @@ -83,6 +85,10 @@ enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PA * any type of production setting. */ //#define SHADOW_ACCTING +/* Turn this on to simulate network partitions. + * Turn off for production settings. */ +#define ALLOW_PARTITION + /* forward declare some structs */ struct limiter; struct identity; diff --git a/drl/ratetypes.h b/drl/ratetypes.h index c4f6b90..5430c2d 100644 --- a/drl/ratetypes.h +++ b/drl/ratetypes.h @@ -36,6 +36,10 @@ typedef struct identity { /** The global rate limit. */ uint32_t limit; + /** The effective global rate limit. Can be lower than the nominal global + * rate limit due to the failure of one or more peers. */ + uint32_t effective_limit; + /** The local rate limit. */ uint32_t locallimit; diff --git a/drl/swim.c b/drl/swim.c new file mode 100644 index 0000000..9993ec6 --- /dev/null +++ b/drl/swim.c @@ -0,0 +1,853 @@ +/* See the DRL-LICENSE file for this file's software license. */ + +#include + +/* Debug output. */ +#include +#include + +/* Socket functions. */ +#include +#include + +/* Byte ordering and address structures. */ +#include + +/* memset() */ +#include + +#include "raterouter.h" +#include "ratetypes.h" +#include "drl_state.h" +#include "peer_comm.h" +#include "swim.h" +#include "logging.h" + +/* From ulogd_DRL.c */ +extern int do_partition; +extern int partition_set; + +extern limiter_t limiter; + +/**Finds the update, if found then frees the memory of the new_update + * and returns 1. If find fails then this returns 0*/ +static int find_and_update(update_t *updates, update_t *new_update) { + if( updates == NULL ) { + printlog(LOG_DEBUG, "SWIM: INFECT: no existing updates\n"); + return 0; + } + update_t *pointer = updates; + while(pointer != NULL) { + if(pointer->remote == new_update->remote && pointer->remote->incarnation >= new_update->remote->incarnation) { + pointer->count = 0; + printlog(LOG_DEBUG, "SWIM: INFECT: update already exists\n"); + free(new_update); + return 1; + } + pointer = pointer->next; + } + printlog(LOG_DEBUG, "SWIM: INFECT: update not found among existing updates\n"); + return 0; +} + +/*Just adds to the end of list and returns the head*/ +update_t *add_to_list(update_t *updates, update_t *new_update) { + printlog(LOG_DEBUG, "SWIM: INFECT: adding to list of updates: %s is %d\n", inet_ntoa(*(struct in_addr *)&new_update->remote->addr), new_update->remote->reachability); + update_t *head = updates; + update_t *pointer; + if (head == NULL) { + head = new_update; + } else { + pointer = head; + while(pointer->next != NULL) { + pointer = pointer->next; + } + pointer->next = new_update; + } + return head; +} + +/** Given the address of the suspected node this function + * identifies friends who can probe the suspected node + * After recording these, the messages for help are sent + * in the next gossip round*/ +static int help_from_friends(comm_t *comm, int suspect_index, uint32_t id, int sock) { + printlog(LOG_DEBUG,"SWIM: In function help_from_friends suspected node: %s, index: %d\n",inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index); + int i=0, j = 0; + int result; + int count_friends = (comm->remote_node_count > MAX_FRIENDS ) ? MAX_FRIENDS : comm->remote_node_count; // A more logical way? + // remote_node_t friend_nodes[count_friends]; + // int friend_ids[count_friends]; + + while( (i - j) < count_friends && i < comm->remote_node_count ) { + // Do not pick a friend who is suspected to be down + if (comm->remote_limiters[i].reachability != REACHABLE) { + j++; i++; + continue; + } + /**construct the message and send it to friend + * pick up friend i*/ + message_t check_msg; + memset(&check_msg, 0, sizeof(message_t)); + check_msg.magic = MAGIC_MSG; + check_msg.ident_id = id; + check_msg.value = 0; + check_msg.weight = 0; + check_msg.seqno = 0; + check_msg.min_seqno = 0; + check_msg.type = CHECK; + check_msg.check_target = comm->remote_limiters[suspect_index].addr; + check_msg.check_port = comm->remote_limiters[suspect_index].port; + + // send the message + struct sockaddr_in toaddr; + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + toaddr.sin_family = AF_INET; + toaddr.sin_addr.s_addr = comm->remote_limiters[i].addr; + toaddr.sin_port = comm->remote_limiters[i].port; + message_to_nbo(&check_msg); + + printlog(LOG_DEBUG,"SWIM: Sending CHECK message to friend %s i: %d", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr), i); + printlog(LOG_DEBUG," Suspect: %s", inet_ntoa(*(struct in_addr *)&check_msg.check_target)); + printlog(LOG_DEBUG," Initial: %s suspect_index: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index); + if (sendto(sock, &check_msg, sizeof(check_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); + result = errno; + printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); + } + i++; + } + printlog(LOG_DEBUG,"SWIM: Out function help_from_friends.\n"); + return 0; +} + +/** Receiving CHECK packet*/ +static void swim_receive_check(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) { + +#ifdef ALLOW_PARTITION + int id; + for(id = 0; id < comm->remote_node_count; id++) { + if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) { + if (do_partition && ((partition_set & (1 << id)) == 0)) { + printlog(LOG_DEBUG, "SWIM: Ignoring CHECK message from %d\n", id); + return; + } + } + } +#endif + +//FIX + if(remote->reachability != REACHABLE) + return; + + swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; + + // create the message that has to be sent to the suspected node + printlog(LOG_DEBUG,"SWIM: received CHECK message from %s", inet_ntoa(*(struct in_addr *)&remote->addr)); + printlog(LOG_DEBUG,", sending PING to %s\n", inet_ntoa(*(struct in_addr *)&msg->check_target)); + int result; + message_t ping_msg; + memset(&ping_msg, 0, sizeof(message_t)); + ping_msg.magic = MAGIC_MSG; + ping_msg.ident_id = msg->ident_id; + ping_msg.value = 0; + ping_msg.weight = 0; + ping_msg.seqno = 0; + ping_msg.min_seqno = 0; + ping_msg.type = PING; + ping_msg.ping_source = remote->addr; + ping_msg.ping_port = remote->port; + // send the ping message + struct sockaddr_in toaddr; + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + toaddr.sin_family = AF_INET; + toaddr.sin_addr.s_addr = msg->check_target; + toaddr.sin_port = msg->check_port; + message_to_nbo(&ping_msg); + + /** add to ping targets before sending message */ + ping_target_t *suspect = (ping_target_t*) malloc(sizeof(ping_target_t)); + memset(suspect, 0, sizeof(ping_target_t)); + suspect->target.addr = msg->check_target; + suspect->target.port = msg->check_port; + suspect->source.addr = remote->addr; + suspect->source.port = remote->port; + printlog(LOG_DEBUG, "SWIM: adding %s to PING list\n", inet_ntoa(*(struct in_addr *)&suspect->target.addr)); + + ping_target_t *pointer = swim_comm->ping_targets; + if( swim_comm->ping_targets != NULL ) { + while(pointer->next != NULL) { + pointer = pointer->next; + } + pointer->next = suspect; + } + else { + swim_comm->ping_targets = suspect; + } + /** added to the end of the list of ping_targets */ + + if (sendto(limiter.udp_socket, &ping_msg, sizeof(ping_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); + result = errno; + printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); + } else { + printlog(LOG_DEBUG,"SWIM: Sent PING message\n"); + } + printlog(LOG_DEBUG,"SWIM: Processed CHECK packet\n"); + return; +} + +/** Receiving PING packet*/ +static void swim_receive_ping(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) { + +#ifdef ALLOW_PARTITION + int id; + for(id = 0; id < comm->remote_node_count; id++) { + if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) { + if (do_partition && ((partition_set & (1 << id)) == 0)) { + printlog(LOG_DEBUG, "SWIM: Ignoring PING message from %d\n", id); + return; + } + } + } +#endif + + printlog(LOG_DEBUG,"SWIM: receiving the PING message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr)); + +//FIX + if(remote->reachability != REACHABLE) + return; + + int result; + swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; + + message_t pingack_msg; + memset(&pingack_msg, 0, sizeof(message_t)); + pingack_msg.magic = MAGIC_MSG; + pingack_msg.ident_id = msg->ident_id; + pingack_msg.value = 0; + pingack_msg.weight = 0; + pingack_msg.seqno = 0; + pingack_msg.min_seqno = 0; + pingack_msg.type = PING_ACK; + + swim_comm->incarnation++; + pingack_msg.update_present = TRUE; + pingack_msg.reachability = REACHABLE; + pingack_msg.incarnation = swim_comm->incarnation; + FILE *fp = fopen("/root/incarnation", "w+"); + fprintf(fp, "%d", swim_comm->incarnation + 1); + fflush(fp); + fclose(fp); + // send PING_ACK + struct sockaddr_in toaddr; + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + toaddr.sin_family = AF_INET; + toaddr.sin_addr.s_addr = remote->addr; + toaddr.sin_port = remote->port; + message_to_nbo(&pingack_msg); + + if (sendto(limiter.udp_socket, &pingack_msg, sizeof(pingack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); + result = errno; + printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); + } else { + printlog(LOG_DEBUG, "SWIM: sent PING_ACK to %s\n", inet_ntoa(*(struct in_addr *)&remote->addr)); + } + printlog(LOG_DEBUG,"SWIM: Processed PING packet\n"); + return; +} + +/** Receiving PING_ACK packet*/ +static void swim_receive_pingack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) { + // find the source which requested this ping and inform it with CHECK_ACK, ALIVE + // look up in the ping_targets list. + printlog(LOG_DEBUG, "SWIM: receiving the PING_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr)); + flushlog(); + int result, confirm; + int delete_head = 0; + ping_target_t* pointer; + ping_target_t* prev_pointer; + swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; + pointer = swim_comm->ping_targets; + prev_pointer = swim_comm->ping_targets; + +/* + * Removed this because if a PING_ACK arrives then CHECK_ACK would be + * sent to all the nodes which requested a check on this node. Hence pointer + * could be NULL but we could receive a CHECK_ACK packet + * if (pointer == NULL) { + printlog(LOG_DEBUG, "SWIM: Received PING_ACK for a PING not sent\n"); + return; + } +*/ + while(pointer!=NULL) { + if(pointer->target.addr == remote->addr && pointer->target.port == remote->port) { + // suspect has been found in the list + // now construct the CHECK_ACK message and send it to source + message_t checkack_msg; + memset(&checkack_msg, 0, sizeof(message_t)); + checkack_msg.magic = MAGIC_MSG; + checkack_msg.ident_id = msg->ident_id; + checkack_msg.value = 0; + checkack_msg.weight = 0; + checkack_msg.seqno = 0; + checkack_msg.min_seqno = 0; + checkack_msg.type = CHECK_ACK; + checkack_msg.checkack_value = ALIVE; + // inform the source of the addr and port of suspected node + checkack_msg.check_target = remote->addr; + checkack_msg.check_port = remote->port; + struct sockaddr_in toaddr; + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + // found source + toaddr.sin_family = AF_INET; + toaddr.sin_addr.s_addr = pointer->source.addr; + toaddr.sin_port = pointer->source.port; + message_to_nbo(&checkack_msg); + + if (sendto(limiter.udp_socket, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_WARN, "WARN: swim_receive_pingack : sento failed.\n"); + result = errno; + printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); + } + + /** Now delete this suspect from friends list of nodes*/ + if(prev_pointer == pointer) { + swim_comm->ping_targets = pointer->next; + pointer->next = NULL; + free(pointer); + pointer = swim_comm->ping_targets; + delete_head = 1; + } else { + prev_pointer->next = pointer->next; + pointer->next = NULL; + free(pointer); + pointer = prev_pointer; + } + confirm = 1; + } + prev_pointer = pointer; + if(pointer != NULL && delete_head != 1) { + pointer = pointer->next; + } + delete_head = 0; + printf("SWIM: PING ACK\n"); + } + // PING_ACK has been received then add to the list of updates + remote_node_t sender; + memset(&sender, 0, sizeof(remote_node_t)); + sender.addr = remote->addr; + sender.port = remote->port; + update_t* new_update = (update_t *) malloc(sizeof(update_t)); + memset(new_update, 0, sizeof(update_t)); + new_update->remote = map_search(comm->remote_node_map, &sender, sizeof(remote_node_t)); + if(new_update->remote == NULL) { + printlog(LOG_DEBUG, "SWIM: PANIC: PING_ACK received from an unknown node %s\n",inet_ntoa(*(struct in_addr *)&sender.addr)); + } + new_update->count = 0; + + if(msg->incarnation > new_update->remote->incarnation) { + new_update->remote->incarnation = msg->incarnation; + new_update->remote->reachability = REACHABLE; + new_update->remote->awol = 0; + new_update->remote->count_rounds = 0; + new_update->remote->count_awol = 0; + new_update->remote->count_alive = 0; + if( find_and_update(swim_comm->updates, new_update) == 0 ) { + swim_comm->updates = add_to_list(swim_comm->updates, new_update); + swim_comm->count_updates++; + } + } else if(msg->incarnation == new_update->remote->incarnation) { + // if the node previously thought that sender was down then it prevails + // else if the node thought it was up then there is no change + } + + if(confirm != 1) printlog(LOG_DEBUG,"SWIM: PING_ACK did not match entries in list\n"); + printlog(LOG_DEBUG,"SWIM: Processed PING_ACK packet\n"); + return; +} + +/** Receiving CHECK_ACK packet*/ +static void swim_receive_checkack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) { + printlog(LOG_DEBUG, "SWIM: receiving the CHECK_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr)); + int i; + for( i = 0; i < comm->remote_node_count; i++) { + if(comm->remote_limiters[i].addr == msg->check_target && comm->remote_limiters[i].port == msg->check_port) { + if(msg->checkack_value == ALIVE) + comm->remote_limiters[i].count_alive++; + else if (msg->checkack_value == AWOL) + comm->remote_limiters[i].count_awol++; + } + } + printlog(LOG_DEBUG,"SWIM: Processed CHECK_ACK packet\n"); + return; +} + +static int swim_send(comm_t *comm, int id, int sock) { + int i, result; + swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; + + /**SOURCE: Send messages to friends to check on + * nodes which are suspected to be down*/ + for(i = 0; i < comm->remote_node_count; i++) { + printlog(LOG_DEBUG, "SWIM: AWOL count of %s is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].awol); + if(comm->remote_limiters[i].awol == GOSSIP_REMOTE_AWOL_THRESHOLD) { + // HACK to make sure this part of code is entered only once + comm->remote_limiters[i].reachability = SUSPECT; + comm->remote_limiters[i].awol++; + help_from_friends(comm, i, id, sock); + } + } + + /**SOURCE: Count number of rounds since the node has been suspected + * If in this process the count reaches threshold then take action + */ + for (i = 0; i < comm->remote_node_count; i++) { + if(comm->remote_limiters[i].reachability == SUSPECT) { + comm->remote_limiters[i].count_rounds++; + printlog(LOG_DEBUG, "SWIM: ROUNDS count on %s index %d is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), i, comm->remote_limiters[i].count_rounds); + if(comm->remote_limiters[i].count_rounds > SOURCE_THRESHOLD) { + if(comm->remote_limiters[i].count_alive > 0) { + printlog(LOG_DEBUG,"SWIM: the node %s was up, wrongly suspected\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr)); + comm->remote_limiters[i].reachability = REACHABLE; + comm->remote_limiters[i].count_rounds = 0; + comm->remote_limiters[i].count_awol = 0; + comm->remote_limiters[i].count_alive = 0; + // FIX + comm->remote_limiters[i].awol = 0; + } + else if (comm->remote_limiters[i].count_awol > 0) { + comm->remote_limiters[i].reachability = UNREACHABLE; + update_t* new_update = (update_t *) malloc(sizeof(update_t)); + memset(new_update, 0, sizeof(update_t)); + new_update->remote = &comm->remote_limiters[i]; + new_update->count = 0; + // comm->remote_limiters[i].incoming.seen_seqno = 0; + printlog(LOG_DEBUG, "SWIM: INFECT: Update down information %s\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr)); + if(find_and_update(swim_comm->updates, new_update) == 0) { + swim_comm->updates = add_to_list(swim_comm->updates, new_update); + swim_comm->count_updates++; + } + printlog(LOG_DEBUG,"SWIM: The node %s is down. reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);// The node is check_list.target + } + else { + /**Even friends have not responded, request for help from more friends?*/ + printlog(LOG_DEBUG,"SWIM: Last ditch effort, even friends did not respond\n"); + comm->remote_limiters[i].reachability = SUSPECT; + comm->remote_limiters[i].count_rounds = 0; // CHECK + help_from_friends(comm, i, id, sock); + } + } + } + } + + /**Actions performed by "FRIEND"*/ + // DELETE THIS LOOP + ping_target_t* ping_list = swim_comm->ping_targets; + while(ping_list != NULL) { + printlog(LOG_DEBUG, "SWIM: in PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr)); + ping_list = ping_list->next; + } + ping_list = swim_comm->ping_targets; + ping_target_t* ping_list_prev = swim_comm->ping_targets; + int delete_head = 0; + while(ping_list != NULL) { + ping_list->count++; + printlog(LOG_DEBUG,"SWIM: friend keeping track of gossip rounds since PING.\n"); + // if in this process some node hits threshold then + // send AWOL and delete it from this list + if(ping_list->count >= FRIEND_THRESHOLD) { + printlog(LOG_DEBUG,"SWIM: friend declaring AWOL.\n"); + message_t checkack_msg; + memset(&checkack_msg, 0, sizeof(message_t)); + checkack_msg.magic = MAGIC_MSG; + checkack_msg.ident_id = id; + checkack_msg.value = 0; + checkack_msg.weight = 0; + checkack_msg.seqno = 0; + checkack_msg.min_seqno = 0; + checkack_msg.type = CHECK_ACK; + checkack_msg.checkack_value = AWOL; + // inform the source of the addr and port of suspected node + checkack_msg.check_target = ping_list->target.addr; + checkack_msg.check_port = ping_list->target.port; + struct sockaddr_in toaddr; + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + toaddr.sin_family = AF_INET; + toaddr.sin_addr.s_addr = ping_list->source.addr; + toaddr.sin_port = ping_list->source.port; + message_to_nbo(&checkack_msg); + + if (sendto(sock, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); + result = errno; + printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); + } + // now the deletion: after deletion we want to continue from next node + printlog(LOG_DEBUG, "SWIM: deleting from PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr)); + if(ping_list_prev == ping_list) { + swim_comm->ping_targets = ping_list->next; + ping_list->next = NULL; + free(ping_list); + ping_list = swim_comm->ping_targets; + delete_head = 1; + } else { + ping_list_prev->next = ping_list->next; + ping_list->next = NULL; + free(ping_list); + ping_list = ping_list_prev; + } + } + ping_list_prev = ping_list; + if (ping_list != NULL && delete_head != 1) ping_list = ping_list->next; + delete_head = 0; + } + + return 0; +} + + +/** Handle SWIM packets received*/ +int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) { + swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; + + if (msg->type == ACK) { + /* If ACK was received then reset the awol count */ + if (msg->seqno == remote->outgoing.next_seqno - 1) { + /* Ack for most recent message. Clear saved state. */ + remote->outgoing.first_seqno = remote->outgoing.next_seqno; + remote->outgoing.saved_value = 0; + remote->outgoing.saved_weight = 0; + + remote->awol = 0; + remote->count_awol = 0; + } + /* Ignore ack if it isn't for most recent message. */ + } else if (msg->type == MSG) { + if (msg->min_seqno > remote->incoming.seen_seqno) { + /* Entirely new information */ + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + comm->gossip.value += msg->value; + comm->gossip.weight += msg->weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + remote->count_rounds = 0; + remote->count_awol = 0; + remote->count_alive = 0; + + // check if there is an update piggy backed on this message + // if yes then add it to the update list + if(msg->update_present > 0) { + update_t *new_update = (update_t *) malloc(sizeof(update_t)); + memset(new_update, 0, sizeof(update_t)); + // look for the remote limiter about whom update is being + // sent the update node is sent in the message, msg->node! + remote_limiter_t *temp_remote = map_search(comm->remote_node_map, &msg->node, sizeof(remote_node_t)); + // an update about the node itself is possible in which case map_search would fail + if(temp_remote != NULL) { + printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update says %d\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr), msg->reachability); + new_update->remote = temp_remote; + new_update->count = 0; + if(msg->incarnation > new_update->remote->incarnation) { + printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update is about new incarnation\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr)); + new_update->remote->reachability = msg->reachability; + new_update->remote->incarnation = msg->incarnation; + if(find_and_update(swim_comm->updates, new_update) == 0) { + swim_comm->updates = add_to_list(swim_comm->updates, new_update); + swim_comm->count_updates++; + } + } else if(msg->incarnation == new_update->remote->incarnation && new_update->remote->reachability == REACHABLE && msg->reachability == UNREACHABLE) { + printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update about same incarnation, says node unreachable\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr)); + new_update->remote->reachability = msg->reachability; + if(find_and_update(swim_comm->updates, new_update) == 0) { + swim_comm->updates = add_to_list(swim_comm->updates, new_update); + swim_comm->count_updates++; + } else { + // Ignore the update + printlog(LOG_DEBUG, "SWIM: INFECT: update about %s ignored\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr)); + } + } + } + } + } + else if (msg->seqno > remote->incoming.seen_seqno) { + /* Only some of the message is old news. */ + double diff_value = msg->value - remote->incoming.saved_value; + double diff_weight = msg->weight - remote->incoming.saved_weight; + + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + + comm->gossip.value += diff_value; + comm->gossip.weight += diff_weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + remote->count_awol = 0; + } + else { + /* The entire message is old news. (Duplicate). */ + /* Do nothing. */ + } + // Hearing from a node previously declared unreachable + if(remote->reachability == UNREACHABLE) { + printlog(LOG_DEBUG, "SWIM: INFECT: seems like %s is back up\n", inet_ntoa(*(struct in_addr *)&remote->addr)); + remote->reachability = SUSPECT; + remote->awol = GOSSIP_REMOTE_AWOL_THRESHOLD; + remote->count_rounds = 0; + remote->count_awol = 0; + remote->count_alive = 0; + } + } + else if(msg->type == CHECK) { + swim_receive_check(comm, sock, remote, msg); + } + else if(msg->type == PING ) { + swim_receive_ping(comm, sock, remote, msg); + } + else if(msg->type == PING_ACK) { + swim_receive_pingack(comm, sock, remote, msg); + } + else if(msg->type == CHECK_ACK) { + swim_receive_checkack(comm, sock, remote, msg); + } + return 0; +} + +int send_gossip_swim(comm_t *comm, uint32_t id, int sock) { + int i, j; + int retry_index = 0; + int result = 0; + remote_limiter_t *remote; + struct sockaddr_in toaddr; + double msg_value, msg_weight; + + /* This is the factor for the portion of value/weight to keep locally. + * Normally this is 1, meaning that we retain the same amount of value/weight + * that was sent to the peers. In the case of not being able to send to a + * peer though, we increment this to reclaim the value/weight locally. */ + int message_portion = 1; + + memset(&toaddr, 0, sizeof(struct sockaddr_in)); + toaddr.sin_family = AF_INET; + + msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1); + msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1); + + /*Nodes to which message was sent will have a non-zero here*/ + /* for (i = 0; i < comm->remote_node_count; i++) { + if(comm->remote_limiters[i].awol > 0) + comm->remote_limiters[i].awol++; + }*/ + + for (i = 0; i < comm->remote_node_count; i++) { + printlog(LOG_DEBUG, "SWIM: STATUS: Node: %s reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability); + } + + for (i = 0; i < comm->gossip.gossip_branch; ++i) { + int targetid = NULL_PEER; + int rand_count = 0; + message_t msg; + + printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch); + + /* If there are any peers with unacked messages, select them first. */ + while (retry_index < comm->remote_node_count) { + if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) { + targetid = retry_index; + printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid); + retry_index += 1; + break; + } + retry_index += 1; + } + + while (targetid == NULL_PEER && rand_count < 50) { + /* *Gossip node would be chosen from + * the array which would be randomly shuffled + * once all the nodes have been sent messages* + */ + targetid = find_gossip_target(comm); + /* If we didn't find any peers the needed retransmissions, select one randomly here. */ +/* targetid = myrand() % comm->remote_node_count; + rand_count += 1; +*/ + /* Don't select an already-used index. */ + for (j = 0; j < i; ++j) { + if (targetid == comm->selected[j]) { + printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. selected[j=%d] is %d\n", targetid, j, comm->selected[j]); + targetid = NULL_PEER; + break; + } + } + if (targetid != NULL_PEER) { + if(comm->remote_limiters[targetid].reachability != REACHABLE) { + printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d. reachability is %d, and remote awol count is %d\n", + targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].awol); + targetid = NULL_PEER; + } + } + + rand_count++; + } + + if (targetid < 0) { + /* Couldn't find a suitable peer to send to... */ + message_portion += 1; + printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n"); + continue; + } else { + printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid); + } + + remote = &comm->remote_limiters[targetid]; + comm->selected[i] = targetid; + + toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */ + toaddr.sin_port = remote->port; + + memset(&msg, 0, sizeof(message_t)); + msg.magic = MAGIC_MSG; + msg.ident_id = id; + msg.value = msg_value + remote->outgoing.saved_value; + msg.weight = msg_weight + remote->outgoing.saved_weight; + msg.seqno = remote->outgoing.next_seqno; + msg.min_seqno = remote->outgoing.first_seqno; + msg.type = MSG; + msg.view = comm->gossip.view; + if(comm->gossip.membership == SWIM) { + swim_comm_t *swim_comm = (swim_comm_t *)comm->membership_state; + // piggy back an update + if(swim_comm->count_updates > 0) { + int index = myrand() % swim_comm->count_updates; + update_t *pointer = swim_comm->updates; + update_t *prev_pointer = swim_comm->updates; + while(index != 0) { + prev_pointer = pointer; + pointer = pointer->next; + index--; + } + msg.update_present = TRUE; + msg.reachability = pointer->remote->reachability; + msg.incarnation = pointer->remote->incarnation; + msg.node.addr = pointer->remote->addr; + msg.node.port = pointer->remote->port; + printlog(LOG_DEBUG, "SWIM: Sending update about %s\n", inet_ntoa(*(struct in_addr *)&pointer->remote->addr)); + pointer->count++; + if(swim_comm->updates != pointer && pointer->count == UPDATE_THRESHOLD) { + // pointer is not head + prev_pointer->next = pointer->next; + pointer->next = NULL; + free(pointer); + swim_comm->count_updates--; + } else if(pointer->count == UPDATE_THRESHOLD) { + // pointer is head of the list + swim_comm->updates = pointer->next; + pointer->next = NULL; + free(pointer); + swim_comm->count_updates--; + if(swim_comm->count_updates == 0) { + swim_comm->updates = NULL; + } + } + } + } + remote->outgoing.next_seqno++; + remote->outgoing.saved_value += msg_value; + remote->outgoing.saved_weight += msg_weight; + /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */ + remote->awol += 1; + + +#ifdef ALLOW_PARTITION + + if (do_partition && ((partition_set & (1 << targetid)) == 0)) { + printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid); + continue; + } + +#endif + + message_to_nbo(&msg); + + if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { + printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n"); + result = errno; + break; + } + printlog(LOG_DEBUG,"SWIM: sent the gossip to %s\n", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr)); + } + swim_send(comm, id, sock); + comm->gossip.value = msg_value * message_portion; + comm->gossip.weight = msg_weight * message_portion; + + return result; +} + +void swim_restart(comm_t *comm, int32_t view_number) { + /* Not sure about this yet... */ +} + +int swim_init(comm_t *comm, uint32_t id) { + comm->membership_state = malloc(sizeof(swim_comm_t)); + if (comm->membership_state == NULL) { + return ENOMEM; + } + comm->connected = 1; + comm->recv_function = swim_receive; + comm->send_function = send_gossip_swim; + comm->restart_function = swim_restart; + + swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state; + + FILE *fp = fopen("/root/incarnation", "w+"); + fscanf(fp, "%d", &swim_comm->incarnation); + fprintf(fp, "%d", swim_comm->incarnation + 1); // next time a greater incarnation would be read + fflush(fp); + fclose(fp); + + return 0; +} + +void swim_teardown(comm_t *comm) { + if (comm->membership_state) + free(comm->membership_state); +} + +/**Bhanu: new functions being introduced*/ +/** Given a friend_id and the address of the suspected node + * this function sends a CHECK message to friend */ +/* + int help_from_friend(comm_t* comm, int friend_id, in_addr_t addr, in_port_t port, uint32_t id, int sock) { +// send a CHECK message to friend +int result; +message_t msg; +memset(&msg, 0, sizeof(message_t)); +msg.magic = MAGIC_MSG; +msg.ident_id = id; +msg.value = 0; +msg.weight = 0; +msg.seqno = 0; +msg.min_seqno = 0; +msg.type = CHECK; +msg.check_target = addr; +msg.check_port = port; + +// send the message +struct sockaddr_in toaddr; +memset(&toaddr, 0, sizeof(sockaddr_in)); +toaddr.sin_family = AF_INET; +toaddr.sin_addr.s_addr = comm->remote_limiters[friend_id].addr; +toaddr.sin_port = comm->remote_limiters[friend_id].port; +message_to_nbo(&msg); + +if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) { +printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n"); +result = errno; +printlog(LOG_WARN, " - The error was |%d|\n", strerror(result)); +} +} +*/ diff --git a/drl/swim.h b/drl/swim.h new file mode 100644 index 0000000..79b5559 --- /dev/null +++ b/drl/swim.h @@ -0,0 +1,67 @@ +/* See the DRL-LICENSE file for this file's software license. */ + +#ifndef _SWIM_H_ +#define _SWIM_H_ + +#define AWOL (0) +#define ALIVE (1) +#define MAX_FRIENDS (5) +#define UPDATE_THRESHOLD (3) +#define FRIEND_THRESHOLD (4) +#define SOURCE_THRESHOLD (8) + +static const uint16_t CHECK = 3; +static const uint16_t CHECK_ACK = 4; +static const uint16_t PING = 5; +static const uint16_t PING_ACK = 6; + +/** Structs needed to maintain the list of nodes to which +* ping messages have been sent. This list is maintained +* by one of the "friend" nodes +*/ +typedef struct ping_target { + // node which has been targetted + remote_node_t target; + // node which requested the ping target + // so that CHECK_ACK could be sent with ALIVE / AWOL + remote_node_t source; + // count of gossip rounds to keep timeout + uint32_t count; + // this has to be a list as well because we have to + // remember all the suspects a particular node has + // pinged and is waiting for a response + struct ping_target *next; +} ping_target_t; + +typedef struct update { + /*Remote limiter whose update this is*/ + remote_limiter_t *remote; + /*Number of times, update has been piggy + *backed on a gossip message*/ + int count; + struct update *next; +} update_t; + +typedef struct swim_comm { + /*Incarnation number of the node*/ + uint32_t incarnation; + + /*List of updates currently held by the node*/ + update_t *updates; + int count_updates; + + /** Keep track of the ping messages being sent and wait for timeout */ + ping_target_t *ping_targets; +} swim_comm_t; + +int swim_init(comm_t *comm, uint32_t id); + +void swim_teardown(comm_t *comm); + +int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg); + +int send_gossip_swim(comm_t *comm, uint32_t id, int sock); + +void swim_restart(comm_t *comm, int32_t view_number); + +#endif /* _SWIM_H_ */ diff --git a/drl/ulogd_DRL.c b/drl/ulogd_DRL.c index f9d6d56..e24b41c 100644 --- a/drl/ulogd_DRL.c +++ b/drl/ulogd_DRL.c @@ -140,8 +140,16 @@ static config_entry_t partition = { .u = { .value = 0xfffffff }, }; -static config_entry_t netem_slice = { +static config_entry_t sfq_slice = { .next = &partition, + .key = "sfq_slice", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + .u = { .string = "NONE" }, +}; + +static config_entry_t netem_slice = { + .next = &sfq_slice, .key = "netem_slice", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, @@ -271,9 +279,9 @@ extern FILE *logfile; extern uint8_t system_loglevel; extern uint8_t do_enforcement; -/* From peer_comm.c - used to simulate partition. */ -extern int do_partition; -extern int partition_set; +/* Used to simulate partitions. */ +int do_partition = 0; +int partition_set = 0xfffffff; /* functions */ @@ -1141,6 +1149,23 @@ static inline int add_htb_netem(const char *iface, const uint32_t parent_major, return execute_cmd(cmd); } +static inline int add_htb_sfq(const char *iface, const uint32_t parent_major, + const uint32_t parent_minor, const uint32_t handle, + const int perturb) { + char cmd[300]; + + sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, + parent_minor, handle); + printlog(LOG_WARN, "HTB_cmd: %s\n", cmd); + if (execute_cmd(cmd)) + printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n"); + + sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d", + iface, parent_major, parent_minor, handle, perturb); + printlog(LOG_WARN, "HTB_cmd: %s\n", cmd); + return execute_cmd(cmd); +} + static int create_htb_hierarchy(drl_instance_t *instance) { char cmd[300]; int i, j, k; @@ -1273,6 +1298,30 @@ static int create_htb_hierarchy(drl_instance_t *instance) { } } + /* Turn on SFQ for experimentation. */ + if (strcmp(sfq_slice.u.string, "NONE")) { + if (!strcmp(sfq_slice.u.string, "ALL")) { + if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30)) + return 1; + if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30)) + return 1; + + for (k = 0; k < instance->leaf_count; ++k) { + if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid), + (0x1000 | instance->leaves[k].xid), 30)) { + return 1; + } + } + } else { + uint32_t slice_xid; + + sscanf(sfq_slice.u.string, "%x", &slice_xid); + + if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30)) + return 1; + } + } + return 0; } diff --git a/drl/zk_drl.c b/drl/zk_drl.c new file mode 100644 index 0000000..a9fd92d --- /dev/null +++ b/drl/zk_drl.c @@ -0,0 +1,346 @@ +/* See the DRL-LICENSE file for this file's software license. */ + +#ifdef BUILD_ZOOKEEPER + +#include +#include +#include +#include +#include +#include +#include + +#include "raterouter.h" +#include "ratetypes.h" +#include "drl_state.h" +#include "peer_comm.h" +#include "zk_drl.h" +#include "logging.h" + +#define NULL_LEN (-1) +#define PATH_LEN (64) +#define PATH_BUFFER_LEN (64) + +static int32_t read_path_cversion(zhandle_t *zkhandle, const char *path) { + struct Stat stat; + int zoo_result; + + memset(&stat, 0, sizeof(struct Stat)); + + zoo_result = zoo_exists(zkhandle, path, 0, &stat); + + if (zoo_result != ZOK) { + return -1; + } + + return stat.cversion; +} + +static int process_membership_change(zhandle_t *zkhandle, zkdrlcontext_t *context, const char *path) { + struct String_vector children; + int32_t view_before = 0; + int32_t view_after = view_before + 1; //Needs to be != to view_before + int zoo_result = 0; + int i; + + while (view_before != view_after) { + view_before = read_path_cversion(zkhandle, path); + + zoo_result = zoo_get_children(zkhandle, path, 1, &children); + if (zoo_result != ZOK) { + return zoo_result; + } + + view_after = read_path_cversion(zkhandle, path); + } + + if (view_after > context->comm->gossip.view) { + printlog(LOG_DEBUG, "ZK:zookeeper watch says we need to restart with a new view.\n"); + context->comm->restart_function(context->comm, view_after); + } + + /* Clear the remote limiter list. This will be overwritten below for + * limiters that are found to be in the new view. */ + for (i = 0; i < context->comm->remote_node_count; ++i) { + context->comm->remote_limiters[i].reachability = UNREACHABLE; + context->comm->remote_limiters[i].view = view_after; + context->comm->remote_limiters[i].view_confidence = NOTIN; + } + + for (i = 0; i < children.count; ++i) { + remote_limiter_t *remote_limiter = NULL; + remote_node_t remote_node; + + memset(&remote_node, 0, sizeof(remote_node_t)); + + printlog(LOG_DEBUG, "ZK:children.data[%d] is %s\n", i, children.data[i]); + + sscanf(children.data[i], "%u", &remote_node.addr); + remote_node.port = htons(LIMITER_LISTEN_PORT); + + if (remote_node.addr != context->local_addr) { + printlog(LOG_DEBUG, "ZK:searching map for %u:%u\n", remote_node.addr, remote_node.port); + remote_limiter = map_search(context->comm->remote_node_map, &remote_node, sizeof(remote_node_t)); + assert(remote_limiter != NULL); + remote_limiter->reachability = REACHABLE; + remote_limiter->view_confidence = IN; + } else { + printlog(LOG_DEBUG, "ZK: %u is my own addr.\n", remote_node.addr); + } + } + + assert(view_after >= 0); + + context->comm->connected = 1; + + return ZOK; +} + +static void zk_connected(zhandle_t *zkhandle, zkdrlcontext_t *context) { + char path[PATH_LEN]; + char path_buffer[PATH_BUFFER_LEN]; + int zoo_result = 0; + + printlog(LOG_DEBUG, "ZK:(Re)Connected to zookeeper.\n"); + + sprintf(path, "/%u", context->id); + + zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, PATH_BUFFER_LEN); + + if (zoo_result == ZOK) { + printlog(LOG_DEBUG, "ZK: created path %s\n", path); + } else { + //An error occurred. It was probably already there. + printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result); + } + + sprintf(path, "/%u/%u", context->id, context->local_addr); + + zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_READ_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, PATH_BUFFER_LEN); + + if (zoo_result == ZOK) { + printlog(LOG_DEBUG, "ZK: created path %s\n", path); + } else { + printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result); + } + + sprintf(path, "/%u", context->id); + + zoo_result = process_membership_change(zkhandle, context, path); + + if (zoo_result != ZOK) { + printlog(LOG_WARN, "ZK: process_membership_change failed?\n"); + } +} + +static void zk_disconnected(zhandle_t *zkhandle, zkdrlcontext_t *context) { + printlog(LOG_DEBUG, "ZK:Disconnected from zookeeper.\n"); + + context->comm->connected = 0; +} + +static void zk_membership_change(zhandle_t *zkhandle, const char *path, zkdrlcontext_t *context) { + int zoo_result = 0; + + printlog(LOG_DEBUG, "ZK:zookeeper child list changed.\n"); + + zoo_result = process_membership_change(zkhandle, context, path); +} + +void zk_drl_restart(comm_t *comm, int32_t view_number) { + int i; + + comm->gossip.value = comm->local_rate; + comm->gossip.weight = 1.0; + comm->gossip.view = view_number; + + for (i = 0; i < comm->remote_node_count; ++i) { + if (comm->remote_limiters[i].view < view_number) { + comm->remote_limiters[i].rate = 0; + memset(&comm->remote_limiters[i].incoming, 0, sizeof(in_neighbor_t)); + memset(&comm->remote_limiters[i].outgoing, 0, sizeof(out_neighbor_t)); + comm->remote_limiters[i].view = view_number; + comm->remote_limiters[i].view_confidence = UNSURE; + } + } + + printlog(LOG_DEBUG, "ZK: Changing view to %d\n", view_number); +} + +static void zk_drl_watcher(zhandle_t *zkhandle, int type, int state, const char *path, void *context_ptr) { + zkdrlcontext_t *context = (zkdrlcontext_t *) context_ptr; + + pthread_rwlock_rdlock(context->limiter_lock); + pthread_mutex_lock(&context->comm->lock); + + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_CONNECTED_STATE) { + /* We're newly connected - set that watch! */ + zk_connected(zkhandle, context); + } else if (state == ZOO_CONNECTING_STATE) { + /* We're no longer connected. Do something safe. */ + zk_disconnected(zkhandle, context); + } else if (state == ZOO_EXPIRED_SESSION_STATE) { + printlog(LOG_DEBUG, "ZK:zookeeper session expired - reconnecting.\n"); + context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0); + } else { + printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path); + } + } else if (type == ZOO_CHILD_EVENT) { + /* The list of child nodes in the group has changed. Re-read the + * group membership list and re-set the watch. */ + zk_membership_change(zkhandle, path, context); + } else { + printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path); + } + + pthread_mutex_unlock(&context->comm->lock); + pthread_rwlock_unlock(context->limiter_lock); +} + +int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) { + if (msg->type == ACK) { + /* If ACK was received then reset the awol count */ + if (msg->view == comm->gossip.view && msg->seqno == remote->outgoing.next_seqno - 1) { + /* Ack for most recent message. Clear saved state. */ + remote->outgoing.first_seqno = remote->outgoing.next_seqno; + remote->outgoing.saved_value = 0; + remote->outgoing.saved_weight = 0; + remote->awol = 0; + + } else if (msg->view > comm->gossip.view) { + printlog(LOG_DEBUG, "ZK:Received ack for newer view, restarting.\n"); + comm->restart_function(comm, msg->view); + remote->view_confidence = IN; + remote->awol = 0; + } + /* Ignore ack if it isn't for most recent message or its from an old view. */ + } else if (msg->type == MSG) { + if (msg->view == comm->gossip.view) { + if (msg->min_seqno > remote->incoming.seen_seqno) { + /* Entirely new information */ + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + comm->gossip.value += msg->value; + comm->gossip.weight += msg->weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + } + else if (msg->seqno > remote->incoming.seen_seqno) { + /* Only some of the message is old news. */ + double diff_value = msg->value - remote->incoming.saved_value; + double diff_weight = msg->weight - remote->incoming.saved_weight; + + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + + comm->gossip.value += diff_value; + comm->gossip.weight += diff_weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + } + else { + /* The entire message is old news. (Duplicate). */ + /* Do nothing. */ + } + } else if (msg->view > comm->gossip.view) { + printlog(LOG_DEBUG, "ZK:received message with a newer viewstamp, restarting.\n"); + comm->restart_function(comm, msg->view); + remote->view_confidence = IN; + + remote->incoming.seen_seqno = msg->seqno; + remote->incoming.saved_value = msg->value; + remote->incoming.saved_weight = msg->weight; + comm->gossip.value += msg->value; + comm->gossip.weight += msg->weight; + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + remote->awol = 0; + } else if (msg->view < comm->gossip.view) { + printlog(LOG_DEBUG, "ZK:received a message with an older viewstamp.\n"); + if (remote->view_confidence == IN) { + /* The sender is in the new view and doesn't know it yet. */ + send_ack(id, remote, msg->seqno, ACK, comm->gossip.view); + } else if (remote->view_confidence == UNSURE) { + /* We don't know if he's in or not. */ + send_ack(id, remote, msg->seqno, UNSUREACK, comm->gossip.view); + } else if (remote->view_confidence == NOTIN) { + /* He's out of luck... */ + send_ack(id, remote, msg->seqno, NACK, comm->gossip.view); + } + remote->awol = 0; + } + } else if (msg->type == UNSUREACK) { + /* We received an ack, but the ack sender was unsure whether or not + * we'll be a part of its new view. Can't do much here... */ + if (msg->view > comm->gossip.view) { + remote->view = msg->view; + remote->view_confidence = IN; + remote->awol = 0; + printlog(LOG_DEBUG, "ZK:received an UNSUREACK for view %d\n", msg->view); + } + } else if (msg->type == NACK) { + if (msg->view > comm->gossip.view) { + remote->view = msg->view; + remote->view_confidence = IN; + remote->awol = 0; + + comm->connected = 0; + printlog(LOG_DEBUG, "ZK:received a NACK for view %d\n", msg->view); + } + } + + return 0; +} + +int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config) { + zkdrlcontext_t *context = NULL; + comm->connected = 0; + + if ((context = malloc(sizeof(zkdrlcontext_t))) == NULL) { + return ENOMEM; + } + + context->zk_host = config->zk_host; + context->limiter_lock = &limiter->limiter_lock; + context->comm = comm; + context->id = id; + context->local_addr = limiter->localaddr; + comm->membership_state = context; + + printlog(LOG_DEBUG, "ZK: Calling zk init\n"); + + context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0); + + if (context->zkhandle == NULL) { + printlog(LOG_CRITICAL, "ZK: docs say that this can fail, but they don't say why. :( Errno is %d\n", errno); + return EINVAL; + } + + comm->recv_function = zk_drl_recv; + comm->send_function = send_udp_gossip; + comm->restart_function = zk_drl_restart; + + return 0; +} + +int zk_drl_close(comm_t *comm) { + zkdrlcontext_t *context = (zkdrlcontext_t *) comm->membership_state; + + zookeeper_close(context->zkhandle); + + if (context && context->zk_host) { + free(context->zk_host); + context->zk_host = NULL; + } + + if (context) { + free(context); + comm->membership_state = NULL; + } + + return 0; +} + +#endif /* BUILD_ZOOKEEPER */ diff --git a/drl/zk_drl.h b/drl/zk_drl.h new file mode 100644 index 0000000..12ac936 --- /dev/null +++ b/drl/zk_drl.h @@ -0,0 +1,39 @@ +/* See the DRL-LICENSE file for this file's software license. */ + +#ifndef _ZK_DRL_ +#define _ZK_DRL_ + +#define _XOPEN_SOURCE 600 + +#include +#include +#include + +#include + +static const uint16_t UNSUREACK = 3; +static const uint16_t NACK = 4; + +typedef struct zkdrlcontext { + /** The host string that should be passed to zookeeper_init when using + * zookeeper. This consists of comma-separated ipaddr:port pairs. Example: + * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" */ + char *zk_host; + + pthread_rwlock_t *limiter_lock; + comm_t *comm; + uint32_t id; + in_addr_t local_addr; + + zhandle_t *zkhandle; +} zkdrlcontext_t; + +int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config); + +int zk_drl_close(comm_t *comm); + +int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg); + +void zk_drl_restart(comm_t *comm, int32_t view_number); + +#endif /* _ZK_DRL_ */ diff --git a/include/zookeeper/recordio.h b/include/zookeeper/recordio.h new file mode 100644 index 0000000..33b8c70 --- /dev/null +++ b/include/zookeeper/recordio.h @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __RECORDIO_H__ +#define __RECORDIO_H__ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct buffer { + int32_t len; + char *buff; +}; + +void deallocate_String(char **s); +void deallocate_Buffer(struct buffer *b); +void deallocate_vector(void *d); +struct iarchive { + int (*start_record)(struct iarchive *ia, const char *tag); + int (*end_record)(struct iarchive *ia, const char *tag); + int (*start_vector)(struct iarchive *ia, const char *tag, int32_t *count); + int (*end_vector)(struct iarchive *ia, const char *tag); + int (*deserialize_Bool)(struct iarchive *ia, const char *name, int32_t *); + int (*deserialize_Int)(struct iarchive *ia, const char *name, int32_t *); + int (*deserialize_Long)(struct iarchive *ia, const char *name, int64_t *); + int (*deserialize_Buffer)(struct iarchive *ia, const char *name, + struct buffer *); + int (*deserialize_String)(struct iarchive *ia, const char *name, char **); + void *priv; +}; +struct oarchive { + int (*start_record)(struct oarchive *oa, const char *tag); + int (*end_record)(struct oarchive *oa, const char *tag); + int (*start_vector)(struct oarchive *oa, const char *tag, const int32_t *count); + int (*end_vector)(struct oarchive *oa, const char *tag); + int (*serialize_Bool)(struct oarchive *oa, const char *name, const int32_t *); + int (*serialize_Int)(struct oarchive *oa, const char *name, const int32_t *); + int (*serialize_Long)(struct oarchive *oa, const char *name, + const int64_t *); + int (*serialize_Buffer)(struct oarchive *oa, const char *name, + const struct buffer *); + int (*serialize_String)(struct oarchive *oa, const char *name, char **); + void *priv; +}; + +struct oarchive *create_buffer_oarchive(void); +void close_buffer_oarchive(struct oarchive **oa, int free_buffer); +struct iarchive *create_buffer_iarchive(char *buffer, int len); +void close_buffer_iarchive(struct iarchive **ia); +char *get_buffer(struct oarchive *); +int get_buffer_len(struct oarchive *); + +int64_t htonll(int64_t v); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/include/zookeeper/zookeeper.h b/include/zookeeper/zookeeper.h new file mode 100644 index 0000000..e69f7ce --- /dev/null +++ b/include/zookeeper/zookeeper.h @@ -0,0 +1,1249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ZOOKEEPER_H_ +#define ZOOKEEPER_H_ + +#include +#include + +#include "zookeeper_version.h" +#include "recordio.h" +#include "zookeeper.jute.h" + +/** + * \file zookeeper.h + * \brief ZooKeeper functions and definitions. + * + * ZooKeeper is a network service that may be backed by a cluster of + * synchronized servers. The data in the service is represented as a tree + * of data nodes. Each node has data, children, an ACL, and status information. + * The data for a node is read and write in its entirety. + * + * ZooKeeper clients can leave watches when they queries the data or children + * of a node. If a watch is left, that client will be notified of the change. + * The notification is a one time trigger. Subsequent chances to the node will + * not trigger a notification unless the client issues a querity with the watch + * flag set. If the client is ever disconnected from the service, even if the + * disconnection is temporary, the watches of the client will be removed from + * the service, so a client must treat a disconnect notification as an implicit + * trigger of all outstanding watches. + * + * When a node is created, it may be flagged as an ephemeral node. Ephemeral + * nodes are automatically removed when a client session is closed or when + * a session times out due to inactivity (the ZooKeeper runtime fills in + * periods of inactivity with pings). Ephemeral nodes cannot have children. + * + * ZooKeeper clients are identified by a server assigned session id. For + * security reasons The server + * also generates a corresponding password for a session. A client may save its + * id and corresponding password to persistent storage in order to use the + * session across program invocation boundaries. + */ + +/* Support for building on various platforms */ + +// on cygwin we should take care of exporting/importing symbols properly +#ifdef DLL_EXPORT +# define ZOOAPI __declspec(dllexport) +#else +# if defined(__CYGWIN__) && !defined(USE_STATIC_LIB) +# define ZOOAPI __declspec(dllimport) +# else +# define ZOOAPI +# endif +#endif + +/** zookeeper return constants **/ + +enum ZOO_ERRORS { + ZOK = 0, /*!< Everything is OK */ + + /** System and server-side errors. + * This is never thrown by the server, it shouldn't be used other than + * to indicate a range. Specifically error codes greater than this + * value, but lesser than {@link #ZAPIERROR}, are system errors. */ + ZSYSTEMERROR = -1, + ZRUNTIMEINCONSISTENCY = -2, /*!< A runtime inconsistency was found */ + ZDATAINCONSISTENCY = -3, /*!< A data inconsistency was found */ + ZCONNECTIONLOSS = -4, /*!< Connection to the server has been lost */ + ZMARSHALLINGERROR = -5, /*!< Error while marshalling or unmarshalling data */ + ZUNIMPLEMENTED = -6, /*!< Operation is unimplemented */ + ZOPERATIONTIMEOUT = -7, /*!< Operation timeout */ + ZBADARGUMENTS = -8, /*!< Invalid arguments */ + ZINVALIDSTATE = -9, /*!< Invliad zhandle state */ + + /** API errors. + * This is never thrown by the server, it shouldn't be used other than + * to indicate a range. Specifically error codes greater than this + * value are API errors (while values less than this indicate a + * {@link #ZSYSTEMERROR}). + */ + ZAPIERROR = -100, + ZNONODE = -101, /*!< Node does not exist */ + ZNOAUTH = -102, /*!< Not authenticated */ + ZBADVERSION = -103, /*!< Version conflict */ + ZNOCHILDRENFOREPHEMERALS = -108, /*!< Ephemeral nodes may not have children */ + ZNODEEXISTS = -110, /*!< The node already exists */ + ZNOTEMPTY = -111, /*!< The node has children */ + ZSESSIONEXPIRED = -112, /*!< The session has been expired by the server */ + ZINVALIDCALLBACK = -113, /*!< Invalid callback specified */ + ZINVALIDACL = -114, /*!< Invalid ACL specified */ + ZAUTHFAILED = -115, /*!< Client authentication failed */ + ZCLOSING = -116, /*!< ZooKeeper is closing */ + ZNOTHING = -117, /*!< (not error) no server responses to process */ + ZSESSIONMOVED = -118 /*! + * The legacy style, an application wishing to receive events from ZooKeeper must + * first implement a function with this signature and pass a pointer to the function + * to \ref zookeeper_init. Next, the application sets a watch by calling one of + * the getter API that accept the watch integer flag (for example, \ref zoo_aexists, + * \ref zoo_get, etc). + *

+ * The watcher object style uses an instance of a "watcher object" which in + * the C world is represented by a pair: a pointer to a function implementing this + * signature and a pointer to watcher context -- handback user-specific data. + * When a watch is triggered this function will be called along with + * the watcher context. An application wishing to use this style must use + * the getter API functions with the "w" prefix in their names (for example, \ref + * zoo_awexists, \ref zoo_wget, etc). + * + * \param zh zookeeper handle + * \param type event type. This is one of the *_EVENT constants. + * \param state connection state. The state value will be one of the *_STATE constants. + * \param path znode path for which the watcher is triggered. NULL if the event + * type is ZOO_SESSION_EVENT + * \param watcherCtx watcher context. + */ +typedef void (*watcher_fn)(zhandle_t *zh, int type, + int state, const char *path,void *watcherCtx); + +/** + * \brief create a handle to used communicate with zookeeper. + * + * This method creates a new handle and a zookeeper session that corresponds + * to that handle. Session establishment is asynchronous, meaning that the + * session should not be considered established until (and unless) an + * event of state ZOO_CONNECTED_STATE is received. + * \param host comma separated host:port pairs, each corresponding to a zk + * server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" + * \param fn the global watcher callback function. When notifications are + * triggered this function will be invoked. + * \param clientid the id of a previously established session that this + * client will be reconnecting to. Pass 0 if not reconnecting to a previous + * session. Clients can access the session id of an established, valid, + * connection by calling \ref zoo_client_id. If the session corresponding to + * the specified clientid has expired, or if the clientid is invalid for + * any reason, the returned zhandle_t will be invalid -- the zhandle_t + * state will indicate the reason for failure (typically + * ZOO_EXPIRED_SESSION_STATE). + * \param context the handback object that will be associated with this instance + * of zhandle_t. Application can access it (for example, in the watcher + * callback) using \ref zoo_get_context. The object is not used by zookeeper + * internally and can be null. + * \param flags reserved for future use. Should be set to zero. + * \return a pointer to the opaque zhandle structure. If it fails to create + * a new zhandle the function returns NULL and the errno variable + * indicates the reason. + */ +ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn, + int recv_timeout, const clientid_t *clientid, void *context, int flags); + +/** + * \brief close the zookeeper handle and free up any resources. + * + * After this call, the client session will no longer be valid. The function + * will flush any outstanding send requests before return. As a result it may + * block. + * + * This method should only be called only once on a zookeeper handle. Calling + * twice will cause undefined (and probably undesirable behavior). Calling any other + * zookeeper method after calling close is undefined behaviour and should be avoided. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \return a result code. Regardless of the error code returned, the zhandle + * will be destroyed and all resources freed. + * + * ZOK - success + * ZBADARGUMENTS - invalid input parameters + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + * ZOPERATIONTIMEOUT - failed to flush the buffers within the specified timeout. + * ZCONNECTIONLOSS - a network error occured while attempting to send request to server + * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details + */ +ZOOAPI int zookeeper_close(zhandle_t *zh); + +/** + * \brief return the client session id, only valid if the connections + * is currently connected (ie. last watcher state is ZOO_CONNECTED_STATE) + */ +ZOOAPI const clientid_t *zoo_client_id(zhandle_t *zh); + +ZOOAPI int zoo_recv_timeout(zhandle_t *zh); + +ZOOAPI const void *zoo_get_context(zhandle_t *zh); + +ZOOAPI void zoo_set_context(zhandle_t *zh, void *context); + +/** + * \brief set a watcher function + * \return previous watcher function + */ +ZOOAPI watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn); + +#ifndef THREADED +/** + * \brief Returns the events that zookeeper is interested in. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param fd is the file descriptor of interest + * \param interest is an or of the ZOOKEEPER_WRITE and ZOOKEEPER_READ flags to + * indicate the I/O of interest on fd. + * \param tv a timeout value to be used with select/poll system call + * \return a result code. + * ZOK - success + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZCONNECTIONLOSS - a network error occured while attempting to establish + * a connection to the server + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + * ZOPERATIONTIMEOUT - hasn't received anything from the server for 2/3 of the + * timeout value specified in zookeeper_init() + * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details + */ +ZOOAPI int zookeeper_interest(zhandle_t *zh, int *fd, int *interest, + struct timeval *tv); + +/** + * \brief Notifies zookeeper that an event of interest has happened. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param events will be an OR of the ZOOKEEPER_WRITE and ZOOKEEPER_READ flags. + * \return a result code. + * ZOK - success + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZCONNECTIONLOSS - a network error occured while attempting to send request to server + * ZSESSIONEXPIRED - connection attempt failed -- the session's expired + * ZAUTHFAILED - authentication request failed, e.i. invalid credentials + * ZRUNTIMEINCONSISTENCY - a server response came out of order + * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details + * ZNOTHING -- not an error; simply indicates that there no more data from the server + * to be processed (when called with ZOOKEEPER_READ flag). + */ +ZOOAPI int zookeeper_process(zhandle_t *zh, int events); +#endif + +/** + * \brief signature of a completion function for a call that returns void. + * + * This method will be invoked at the end of a asynchronous call and also as + * a result of connection loss or timeout. + * \param rc the error code of the call. Connection loss/timeout triggers + * the completion with one of the following error codes: + * ZCONNECTIONLOSS -- lost connection to the server + * ZOPERATIONTIMEOUT -- connection timed out + * Data related events trigger the completion with error codes listed the + * Exceptions section of the documentation of the function that initiated the + * call. (Zero indicates call was successful.) + * \param data the pointer that was passed by the caller when the function + * that this completion corresponds to was invoked. The programmer + * is responsible for any memory freeing associated with the data + * pointer. + */ +typedef void (*void_completion_t)(int rc, const void *data); + +/** + * \brief signature of a completion function that returns a Stat structure. + * + * This method will be invoked at the end of a asynchronous call and also as + * a result of connection loss or timeout. + * \param rc the error code of the call. Connection loss/timeout triggers + * the completion with one of the following error codes: + * ZCONNECTIONLOSS -- lost connection to the server + * ZOPERATIONTIMEOUT -- connection timed out + * Data related events trigger the completion with error codes listed the + * Exceptions section of the documentation of the function that initiated the + * call. (Zero indicates call was successful.) + * \param stat a pointer to the stat information for the node involved in + * this function. If a non zero error code is returned, the content of + * stat is undefined. The programmer is NOT responsible for freeing stat. + * \param data the pointer that was passed by the caller when the function + * that this completion corresponds to was invoked. The programmer + * is responsible for any memory freeing associated with the data + * pointer. + */ +typedef void (*stat_completion_t)(int rc, const struct Stat *stat, + const void *data); + +/** + * \brief signature of a completion function that returns data. + * + * This method will be invoked at the end of a asynchronous call and also as + * a result of connection loss or timeout. + * \param rc the error code of the call. Connection loss/timeout triggers + * the completion with one of the following error codes: + * ZCONNECTIONLOSS -- lost connection to the server + * ZOPERATIONTIMEOUT -- connection timed out + * Data related events trigger the completion with error codes listed the + * Exceptions section of the documentation of the function that initiated the + * call. (Zero indicates call was successful.) + * \param value the value of the information returned by the asynchronous call. + * If a non zero error code is returned, the content of value is undefined. + * The programmer is NOT responsible for freeing value. + * \param value_len the number of bytes in value. + * \param stat a pointer to the stat information for the node involved in + * this function. If a non zero error code is returned, the content of + * stat is undefined. The programmer is NOT responsible for freeing stat. + * \param data the pointer that was passed by the caller when the function + * that this completion corresponds to was invoked. The programmer + * is responsible for any memory freeing associated with the data + * pointer. + */ +typedef void (*data_completion_t)(int rc, const char *value, int value_len, + const struct Stat *stat, const void *data); + +/** + * \brief signature of a completion function that returns a list of strings. + * + * This method will be invoked at the end of a asynchronous call and also as + * a result of connection loss or timeout. + * \param rc the error code of the call. Connection loss/timeout triggers + * the completion with one of the following error codes: + * ZCONNECTIONLOSS -- lost connection to the server + * ZOPERATIONTIMEOUT -- connection timed out + * Data related events trigger the completion with error codes listed the + * Exceptions section of the documentation of the function that initiated the + * call. (Zero indicates call was successful.) + * \param strings a pointer to the structure containng the list of strings of the + * names of the children of a node. If a non zero error code is returned, + * the content of strings is undefined. The programmer is NOT responsible + * for freeing strings. + * \param data the pointer that was passed by the caller when the function + * that this completion corresponds to was invoked. The programmer + * is responsible for any memory freeing associated with the data + * pointer. + */ +typedef void (*strings_completion_t)(int rc, + const struct String_vector *strings, const void *data); + +/** + * \brief signature of a completion function that returns a list of strings. + * + * This method will be invoked at the end of a asynchronous call and also as + * a result of connection loss or timeout. + * \param rc the error code of the call. Connection loss/timeout triggers + * the completion with one of the following error codes: + * ZCONNECTIONLOSS -- lost connection to the server + * ZOPERATIONTIMEOUT -- connection timed out + * Data related events trigger the completion with error codes listed the + * Exceptions section of the documentation of the function that initiated the + * call. (Zero indicates call was successful.) + * \param value the value of the string returned. + * \param data the pointer that was passed by the caller when the function + * that this completion corresponds to was invoked. The programmer + * is responsible for any memory freeing associated with the data + * pointer. + */ +typedef void + (*string_completion_t)(int rc, const char *value, const void *data); + +/** + * \brief signature of a completion function that returns an ACL. + * + * This method will be invoked at the end of a asynchronous call and also as + * a result of connection loss or timeout. + * \param rc the error code of the call. Connection loss/timeout triggers + * the completion with one of the following error codes: + * ZCONNECTIONLOSS -- lost connection to the server + * ZOPERATIONTIMEOUT -- connection timed out + * Data related events trigger the completion with error codes listed the + * Exceptions section of the documentation of the function that initiated the + * call. (Zero indicates call was successful.) + * \param acl a pointer to the structure containng the ACL of a node. If a non + * zero error code is returned, the content of strings is undefined. The + * programmer is NOT responsible for freeing acl. + * \param stat a pointer to the stat information for the node involved in + * this function. If a non zero error code is returned, the content of + * stat is undefined. The programmer is NOT responsible for freeing stat. + * \param data the pointer that was passed by the caller when the function + * that this completion corresponds to was invoked. The programmer + * is responsible for any memory freeing associated with the data + * pointer. + */ +typedef void (*acl_completion_t)(int rc, struct ACL_vector *acl, + struct Stat *stat, const void *data); + +/** + * \brief get the state of the zookeeper connection. + * + * The return value will be one of the \ref State Consts. + */ +ZOOAPI int zoo_state(zhandle_t *zh); + +/** + * \brief create a node. + * + * This method will create a node in ZooKeeper. A node can only be created if + * it does not already exists. The Create Flags affect the creation of nodes. + * If ZOO_EPHEMERAL flag is set, the node will automatically get removed if the + * client session goes away. If the ZOO_SEQUENCE flag is set, a unique + * monotonically increasing sequence number is appended to the path name. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path The name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param value The data to be stored in the node. + * \param valuelen The number of bytes in data. + * \param acl The initial ACL of the node. If null, the ACL of the parent will be + * used. + * \param flags this parameter can be set to 0 for normal create or an OR + * of the Create Flags + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the parent node does not exist. + * ZNODEEXISTS the node already exists + * ZNOAUTH the client does not have permission. + * ZNOCHILDRENFOREPHEMERALS cannot create children of ephemeral nodes. + * \param data The data that will be passed to the completion routine when the + * function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_acreate(zhandle_t *zh, const char *path, const char *value, + int valuelen, const struct ACL_vector *acl, int flags, + string_completion_t completion, const void *data); + +/** + * \brief delete a node in zookeeper. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param version the expected version of the node. The function will fail if the + * actual version of the node does not match the expected version. + * If -1 is used the version check will not take place. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADVERSION expected version does not match actual version. + * ZNOTEMPTY children are present; node cannot be deleted. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_adelete(zhandle_t *zh, const char *path, int version, + void_completion_t completion, const void *data); + +/** + * \brief checks the existence of a node in zookeeper. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watch if nonzero, a watch will be set at the server to notify the + * client if the node changes. The watch will be set even if the node does not + * exist. This allows clients to watch for nodes to appear. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * \param data the data that will be passed to the completion routine when the + * function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_aexists(zhandle_t *zh, const char *path, int watch, + stat_completion_t completion, const void *data); + +/** + * \brief checks the existence of a node in zookeeper. + * + * This function is similar to \ref zoo_axists except it allows one specify + * a watcher object - a function pointer and associated context. The function + * will be called once the watch has fired. The associated context data will be + * passed to the function as the watcher context parameter. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watcher if non-null a watch will set on the specified znode on the server. + * The watch will be set even if the node does not exist. This allows clients + * to watch for nodes to appear. + * \param watcherCtx user specific data, will be passed to the watcher callback. + * Unlike the global context set by \ref zookeeper_init, this watcher context + * is associated with the given instance of the watcher only. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * \param data the data that will be passed to the completion routine when the + * function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_awexists(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + stat_completion_t completion, const void *data); + +/** + * \brief gets the data associated with a node. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watch if nonzero, a watch will be set at the server to notify + * the client if the node changes. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_aget(zhandle_t *zh, const char *path, int watch, + data_completion_t completion, const void *data); + +/** + * \brief gets the data associated with a node. + * + * This function is similar to \ref zoo_aget except it allows one specify + * a watcher object rather than a boolean watch flag. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watcher if non-null, a watch will be set at the server to notify + * the client if the node changes. + * \param watcherCtx user specific data, will be passed to the watcher callback. + * Unlike the global context set by \ref zookeeper_init, this watcher context + * is associated with the given instance of the watcher only. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_awget(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + data_completion_t completion, const void *data); + +/** + * \brief sets the data associated with a node. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param buffer the buffer holding data to be written to the node. + * \param buflen the number of bytes from buffer to write. + * \param version the expected version of the node. The function will fail if + * the actual version of the node does not match the expected version. If -1 is + * used the version check will not take place. * completion: If null, + * the function will execute synchronously. Otherwise, the function will return + * immediately and invoke the completion routine when the request completes. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADVERSION expected version does not match actual version. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen, + int version, stat_completion_t completion, const void *data); + +/** + * \brief lists the children of a node. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watch if nonzero, a watch will be set at the server to notify + * the client if the node changes. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_aget_children(zhandle_t *zh, const char *path, int watch, + strings_completion_t completion, const void *data); + +/** + * \brief lists the children of a node. + * + * This function is similar to \ref zoo_aget_children except it allows one specify + * a watcher object rather than a boolean watch flag. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watcher if non-null, a watch will be set at the server to notify + * the client if the node changes. + * \param watcherCtx user specific data, will be passed to the watcher callback. + * Unlike the global context set by \ref zookeeper_init, this watcher context + * is associated with the given instance of the watcher only. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_awget_children(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + strings_completion_t completion, const void *data); + +/** + * \brief Flush leader channel. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ + +ZOOAPI int zoo_async(zhandle_t *zh, const char *path, + string_completion_t completion, const void *data); + + +/** + * \brief gets the acl associated with a node. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion, + const void *data); + +/** + * \brief sets the acl associated with a node. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param buffer the buffer holding the acls to be written to the node. + * \param buflen the number of bytes from buffer to write. + * \param completion the routine to invoke when the request completes. The completion + * will be triggered with one of the following codes passed in as the rc argument: + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZINVALIDACL invalid ACL specified + * ZBADVERSION expected version does not match actual version. + * \param data the data that will be passed to the completion routine when + * the function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_aset_acl(zhandle_t *zh, const char *path, int version, + struct ACL_vector *acl, void_completion_t, const void *data); + +/** + * \brief return an error string. + * + * \param return code + * \return string corresponding to the return code + */ +ZOOAPI const char* zerror(int c); + +/** + * \brief specify application credentials. + * + * The application calls this function to specify its credentials for purposes + * of authentication. The server will use the security provider specified by + * the scheme parameter to authenticate the client connection. If the + * authentication request has failed: + * - the server connection is dropped + * - the watcher is called with the ZOO_AUTH_FAILED_STATE value as the state + * parameter. + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param scheme the id of authentication scheme. Natively supported: + * "digest" password-based authentication + * \param cert application credentials. The actual value depends on the scheme. + * \param certLen the length of the data parameter + * \param completion the routine to invoke when the request completes. One of + * the following result codes may be passed into the completion callback: + * ZOK operation completed successfully + * ZAUTHFAILED authentication failed + * \param data the data that will be passed to the completion routine when the + * function completes. + * \return ZOK on success or one of the following errcodes on failure: + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + * ZSYSTEMERROR - a system error occured + */ +ZOOAPI int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert, + int certLen, void_completion_t completion, const void *data); + +/** + * \brief checks if the current zookeeper connection state can't be recovered. + * + * The application must close the zhandle and try to reconnect. + * + * \param zh the zookeeper handle (see \ref zookeeper_init) + * \return ZINVALIDSTATE if connection is unrecoverable + */ +ZOOAPI int is_unrecoverable(zhandle_t *zh); + +/** + * \brief sets the debugging level for the library + */ +ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel); + +/** + * \brief sets the stream to be used by the library for logging + * + * The zookeeper library uses stderr as its default log stream. Application + * must make sure the stream is writable. Passing in NULL resets the stream + * to its default value (stderr). + */ +ZOOAPI void zoo_set_log_stream(FILE* logStream); + +/** + * \brief enable/disable quorum endpoint order randomization + * + * If passed a non-zero value, will make the client connect to quorum peers + * in the order as specified in the zookeeper_init() call. + * A zero value causes zookeeper_init() to permute the peer endpoints + * which is good for more even client connection distribution among the + * quorum peers. + */ +ZOOAPI void zoo_deterministic_conn_order(int yesOrNo); + +/** + * \brief create a node synchronously. + * + * This method will create a node in ZooKeeper. A node can only be created if + * it does not already exists. The Create Flags affect the creation of nodes. + * If ZOO_EPHEMERAL flag is set, the node will automatically get removed if the + * client session goes away. If the ZOO_SEQUENCE flag is set, a unique + * monotonically increasing sequence number is appended to the path name. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path The name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param value The data to be stored in the node. + * \param valuelen The number of bytes in data. To set the data to be NULL use + * value as NULL and valuelen as -1. + * \param acl The initial ACL of the node. If null, the ACL of the parent will be + * used. + * \param flags this parameter can be set to 0 for normal create or an OR + * of the Create Flags + * \param path_buffer Buffer which will be filled with the path of the + * new node (this might be different than the supplied path + * because of the ZOO_SEQUENCE flag). The path string will always be + * null-terminated. + * \param path_buffer_len Size of path buffer; if the path of the new + * node (including space for the null terminator) exceeds the buffer size, + * the path string will be truncated to fit. The actual path of the + * new node in the server will not be affected by the truncation. + * The path string will always be null-terminated. + * \return one of the following codes are returned: + * ZOK operation completed succesfully + * ZNONODE the parent node does not exist. + * ZNODEEXISTS the node already exists + * ZNOAUTH the client does not have permission. + * ZNOCHILDRENFOREPHEMERALS cannot create children of ephemeral nodes. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_create(zhandle_t *zh, const char *path, const char *value, + int valuelen, const struct ACL_vector *acl, int flags, + char *path_buffer, int path_buffer_len); + +/** + * \brief delete a node in zookeeper synchronously. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param version the expected version of the node. The function will fail if the + * actual version of the node does not match the expected version. + * If -1 is used the version check will not take place. + * \return one of the following values is returned. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADVERSION expected version does not match actual version. + * ZNOTEMPTY children are present; node cannot be deleted. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_delete(zhandle_t *zh, const char *path, int version); + + +/** + * \brief checks the existence of a node in zookeeper synchronously. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watch if nonzero, a watch will be set at the server to notify the + * client if the node changes. The watch will be set even if the node does not + * exist. This allows clients to watch for nodes to appear. + * \param the return stat value of the node. + * \return return code of the function call. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat); + +/** + * \brief checks the existence of a node in zookeeper synchronously. + * + * This function is similar to \ref zoo_exists except it allows one specify + * a watcher object rather than a boolean watch flag. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watcher if non-null a watch will set on the specified znode on the server. + * The watch will be set even if the node does not exist. This allows clients + * to watch for nodes to appear. + * \param watcherCtx user specific data, will be passed to the watcher callback. + * Unlike the global context set by \ref zookeeper_init, this watcher context + * is associated with the given instance of the watcher only. + * \param the return stat value of the node. + * \return return code of the function call. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_wexists(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, struct Stat *stat); + +/** + * \brief gets the data associated with a node synchronously. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watch if nonzero, a watch will be set at the server to notify + * the client if the node changes. + * \param buffer the buffer holding the node data returned by the server + * \param buffer_len is the size of the buffer pointed to by the buffer parameter. + * It'll be set to the actual data length upon return. If the data is NULL, length is -1. + * \param stat if not NULL, will hold the value of stat for the path on return. + * \return return value of the function call. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer, + int* buffer_len, struct Stat *stat); + +/** + * \brief gets the data associated with a node synchronously. + * + * This function is similar to \ref zoo_get except it allows one specify + * a watcher object rather than a boolean watch flag. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watcher if non-null, a watch will be set at the server to notify + * the client if the node changes. + * \param watcherCtx user specific data, will be passed to the watcher callback. + * Unlike the global context set by \ref zookeeper_init, this watcher context + * is associated with the given instance of the watcher only. + * \param buffer the buffer holding the node data returned by the server + * \param buffer_len is the size of the buffer pointed to by the buffer parameter. + * It'll be set to the actual data length upon return. If the data is NULL, length is -1. + * \param stat if not NULL, will hold the value of stat for the path on return. + * \return return value of the function call. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_wget(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + char *buffer, int* buffer_len, struct Stat *stat); + +/** + * \brief sets the data associated with a node. See zoo_set2 function if + * you require access to the stat information associated with the znode. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param buffer the buffer holding data to be written to the node. + * \param buflen the number of bytes from buffer to write. To set NULL as data + * use buffer as NULL and buflen as -1. + * \param version the expected version of the node. The function will fail if + * the actual version of the node does not match the expected version. If -1 is + * used the version check will not take place. + * \return the return code for the function call. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADVERSION expected version does not match actual version. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_set(zhandle_t *zh, const char *path, const char *buffer, + int buflen, int version); + +/** + * \brief sets the data associated with a node. This function is the same + * as zoo_set except that it also provides access to stat information + * associated with the znode. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param buffer the buffer holding data to be written to the node. + * \param buflen the number of bytes from buffer to write. To set NULL as data + * use buffer as NULL and buflen as -1. + * \param version the expected version of the node. The function will fail if + * the actual version of the node does not match the expected version. If -1 is + * used the version check will not take place. + * \param stat if not NULL, will hold the value of stat for the path on return. + * \return the return code for the function call. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADVERSION expected version does not match actual version. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_set2(zhandle_t *zh, const char *path, const char *buffer, + int buflen, int version, struct Stat *stat); + +/** + * \brief lists the children of a node synchronously. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watch if nonzero, a watch will be set at the server to notify + * the client if the node changes. + * \param strings return value of children paths. + * \return the return code of the function. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch, + struct String_vector *strings); + +/** + * \brief lists the children of a node synchronously. + * + * This function is similar to \ref zoo_get_children except it allows one specify + * a watcher object rather than a boolean watch flag. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param watcher if non-null, a watch will be set at the server to notify + * the client if the node changes. + * \param watcherCtx user specific data, will be passed to the watcher callback. + * Unlike the global context set by \ref zookeeper_init, this watcher context + * is associated with the given instance of the watcher only. + * \param strings return value of children paths. + * \return the return code of the function. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + struct String_vector *strings); + +/** + * \brief gets the acl associated with a node synchronously. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param acl the return value of acls on the path. + * \param stat returns the stat of the path specified. + * \return the return code for the function call. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl, + struct Stat *stat); + +/** + * \brief sets the acl associated with a node synchronously. + * + * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init + * \param path the name of the node. Expressed as a file name with slashes + * separating ancestors of the node. + * \param version the expected version of the path. + * \param acl the acl to be set on the path. + * \return the return code for the function call. + * ZOK operation completed succesfully + * ZNONODE the node does not exist. + * ZNOAUTH the client does not have permission. + * ZINVALIDACL invalid ACL specified + * ZBADVERSION expected version does not match actual version. + * ZBADARGUMENTS - invalid input parameters + * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE + * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory + */ +ZOOAPI int zoo_set_acl(zhandle_t *zh, const char *path, int version, + const struct ACL_vector *acl); + +#ifdef __cplusplus +} +#endif + +#endif /*ZOOKEEPER_H_*/ diff --git a/include/zookeeper/zookeeper.jute.h b/include/zookeeper/zookeeper.jute.h new file mode 100644 index 0000000..2a32934 --- /dev/null +++ b/include/zookeeper/zookeeper.jute.h @@ -0,0 +1,376 @@ +#ifndef __ZOOKEEPER_JUTE__ +#define __ZOOKEEPER_JUTE__ +#include "recordio.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct Id { + char * scheme; + char * id; +}; +int serialize_Id(struct oarchive *out, const char *tag, struct Id *v); +int deserialize_Id(struct iarchive *in, const char *tag, struct Id*v); +void deallocate_Id(struct Id*); +struct ACL { + int32_t perms; + struct Id id; +}; +int serialize_ACL(struct oarchive *out, const char *tag, struct ACL *v); +int deserialize_ACL(struct iarchive *in, const char *tag, struct ACL*v); +void deallocate_ACL(struct ACL*); +struct Stat { + int64_t czxid; + int64_t mzxid; + int64_t ctime; + int64_t mtime; + int32_t version; + int32_t cversion; + int32_t aversion; + int64_t ephemeralOwner; + int32_t dataLength; + int32_t numChildren; + int64_t pzxid; +}; +int serialize_Stat(struct oarchive *out, const char *tag, struct Stat *v); +int deserialize_Stat(struct iarchive *in, const char *tag, struct Stat*v); +void deallocate_Stat(struct Stat*); +struct StatPersisted { + int64_t czxid; + int64_t mzxid; + int64_t ctime; + int64_t mtime; + int32_t version; + int32_t cversion; + int32_t aversion; + int64_t ephemeralOwner; + int64_t pzxid; +}; +int serialize_StatPersisted(struct oarchive *out, const char *tag, struct StatPersisted *v); +int deserialize_StatPersisted(struct iarchive *in, const char *tag, struct StatPersisted*v); +void deallocate_StatPersisted(struct StatPersisted*); +struct StatPersistedV1 { + int64_t czxid; + int64_t mzxid; + int64_t ctime; + int64_t mtime; + int32_t version; + int32_t cversion; + int32_t aversion; + int64_t ephemeralOwner; +}; +int serialize_StatPersistedV1(struct oarchive *out, const char *tag, struct StatPersistedV1 *v); +int deserialize_StatPersistedV1(struct iarchive *in, const char *tag, struct StatPersistedV1*v); +void deallocate_StatPersistedV1(struct StatPersistedV1*); +struct op_result_t { + int32_t rc; + int32_t op; + struct buffer response; +}; +int serialize_op_result_t(struct oarchive *out, const char *tag, struct op_result_t *v); +int deserialize_op_result_t(struct iarchive *in, const char *tag, struct op_result_t*v); +void deallocate_op_result_t(struct op_result_t*); +struct ConnectRequest { + int32_t protocolVersion; + int64_t lastZxidSeen; + int32_t timeOut; + int64_t sessionId; + struct buffer passwd; +}; +int serialize_ConnectRequest(struct oarchive *out, const char *tag, struct ConnectRequest *v); +int deserialize_ConnectRequest(struct iarchive *in, const char *tag, struct ConnectRequest*v); +void deallocate_ConnectRequest(struct ConnectRequest*); +struct ConnectResponse { + int32_t protocolVersion; + int32_t timeOut; + int64_t sessionId; + struct buffer passwd; +}; +int serialize_ConnectResponse(struct oarchive *out, const char *tag, struct ConnectResponse *v); +int deserialize_ConnectResponse(struct iarchive *in, const char *tag, struct ConnectResponse*v); +void deallocate_ConnectResponse(struct ConnectResponse*); +struct String_vector { + int32_t count; + char * *data; +; +}; +int serialize_String_vector(struct oarchive *out, const char *tag, struct String_vector *v); +int deserialize_String_vector(struct iarchive *in, const char *tag, struct String_vector *v); +int allocate_String_vector(struct String_vector *v, int32_t len); +int deallocate_String_vector(struct String_vector *v); +struct SetWatches { + int64_t relativeZxid; + struct String_vector dataWatches; + struct String_vector existWatches; + struct String_vector childWatches; +}; +int serialize_SetWatches(struct oarchive *out, const char *tag, struct SetWatches *v); +int deserialize_SetWatches(struct iarchive *in, const char *tag, struct SetWatches*v); +void deallocate_SetWatches(struct SetWatches*); +struct RequestHeader { + int32_t xid; + int32_t type; +}; +int serialize_RequestHeader(struct oarchive *out, const char *tag, struct RequestHeader *v); +int deserialize_RequestHeader(struct iarchive *in, const char *tag, struct RequestHeader*v); +void deallocate_RequestHeader(struct RequestHeader*); +struct AuthPacket { + int32_t type; + char * scheme; + struct buffer auth; +}; +int serialize_AuthPacket(struct oarchive *out, const char *tag, struct AuthPacket *v); +int deserialize_AuthPacket(struct iarchive *in, const char *tag, struct AuthPacket*v); +void deallocate_AuthPacket(struct AuthPacket*); +struct ReplyHeader { + int32_t xid; + int64_t zxid; + int32_t err; +}; +int serialize_ReplyHeader(struct oarchive *out, const char *tag, struct ReplyHeader *v); +int deserialize_ReplyHeader(struct iarchive *in, const char *tag, struct ReplyHeader*v); +void deallocate_ReplyHeader(struct ReplyHeader*); +struct GetDataRequest { + char * path; + int32_t watch; +}; +int serialize_GetDataRequest(struct oarchive *out, const char *tag, struct GetDataRequest *v); +int deserialize_GetDataRequest(struct iarchive *in, const char *tag, struct GetDataRequest*v); +void deallocate_GetDataRequest(struct GetDataRequest*); +struct SetDataRequest { + char * path; + struct buffer data; + int32_t version; +}; +int serialize_SetDataRequest(struct oarchive *out, const char *tag, struct SetDataRequest *v); +int deserialize_SetDataRequest(struct iarchive *in, const char *tag, struct SetDataRequest*v); +void deallocate_SetDataRequest(struct SetDataRequest*); +struct SetDataResponse { + struct Stat stat; +}; +int serialize_SetDataResponse(struct oarchive *out, const char *tag, struct SetDataResponse *v); +int deserialize_SetDataResponse(struct iarchive *in, const char *tag, struct SetDataResponse*v); +void deallocate_SetDataResponse(struct SetDataResponse*); +struct ACL_vector { + int32_t count; + struct ACL *data; +; +}; +int serialize_ACL_vector(struct oarchive *out, const char *tag, struct ACL_vector *v); +int deserialize_ACL_vector(struct iarchive *in, const char *tag, struct ACL_vector *v); +int allocate_ACL_vector(struct ACL_vector *v, int32_t len); +int deallocate_ACL_vector(struct ACL_vector *v); +struct CreateRequest { + char * path; + struct buffer data; + struct ACL_vector acl; + int32_t flags; +}; +int serialize_CreateRequest(struct oarchive *out, const char *tag, struct CreateRequest *v); +int deserialize_CreateRequest(struct iarchive *in, const char *tag, struct CreateRequest*v); +void deallocate_CreateRequest(struct CreateRequest*); +struct DeleteRequest { + char * path; + int32_t version; +}; +int serialize_DeleteRequest(struct oarchive *out, const char *tag, struct DeleteRequest *v); +int deserialize_DeleteRequest(struct iarchive *in, const char *tag, struct DeleteRequest*v); +void deallocate_DeleteRequest(struct DeleteRequest*); +struct GetChildrenRequest { + char * path; + int32_t watch; +}; +int serialize_GetChildrenRequest(struct oarchive *out, const char *tag, struct GetChildrenRequest *v); +int deserialize_GetChildrenRequest(struct iarchive *in, const char *tag, struct GetChildrenRequest*v); +void deallocate_GetChildrenRequest(struct GetChildrenRequest*); +struct GetMaxChildrenRequest { + char * path; +}; +int serialize_GetMaxChildrenRequest(struct oarchive *out, const char *tag, struct GetMaxChildrenRequest *v); +int deserialize_GetMaxChildrenRequest(struct iarchive *in, const char *tag, struct GetMaxChildrenRequest*v); +void deallocate_GetMaxChildrenRequest(struct GetMaxChildrenRequest*); +struct GetMaxChildrenResponse { + int32_t max; +}; +int serialize_GetMaxChildrenResponse(struct oarchive *out, const char *tag, struct GetMaxChildrenResponse *v); +int deserialize_GetMaxChildrenResponse(struct iarchive *in, const char *tag, struct GetMaxChildrenResponse*v); +void deallocate_GetMaxChildrenResponse(struct GetMaxChildrenResponse*); +struct SetMaxChildrenRequest { + char * path; + int32_t max; +}; +int serialize_SetMaxChildrenRequest(struct oarchive *out, const char *tag, struct SetMaxChildrenRequest *v); +int deserialize_SetMaxChildrenRequest(struct iarchive *in, const char *tag, struct SetMaxChildrenRequest*v); +void deallocate_SetMaxChildrenRequest(struct SetMaxChildrenRequest*); +struct SyncRequest { + char * path; +}; +int serialize_SyncRequest(struct oarchive *out, const char *tag, struct SyncRequest *v); +int deserialize_SyncRequest(struct iarchive *in, const char *tag, struct SyncRequest*v); +void deallocate_SyncRequest(struct SyncRequest*); +struct SyncResponse { + char * path; +}; +int serialize_SyncResponse(struct oarchive *out, const char *tag, struct SyncResponse *v); +int deserialize_SyncResponse(struct iarchive *in, const char *tag, struct SyncResponse*v); +void deallocate_SyncResponse(struct SyncResponse*); +struct GetACLRequest { + char * path; +}; +int serialize_GetACLRequest(struct oarchive *out, const char *tag, struct GetACLRequest *v); +int deserialize_GetACLRequest(struct iarchive *in, const char *tag, struct GetACLRequest*v); +void deallocate_GetACLRequest(struct GetACLRequest*); +struct SetACLRequest { + char * path; + struct ACL_vector acl; + int32_t version; +}; +int serialize_SetACLRequest(struct oarchive *out, const char *tag, struct SetACLRequest *v); +int deserialize_SetACLRequest(struct iarchive *in, const char *tag, struct SetACLRequest*v); +void deallocate_SetACLRequest(struct SetACLRequest*); +struct SetACLResponse { + struct Stat stat; +}; +int serialize_SetACLResponse(struct oarchive *out, const char *tag, struct SetACLResponse *v); +int deserialize_SetACLResponse(struct iarchive *in, const char *tag, struct SetACLResponse*v); +void deallocate_SetACLResponse(struct SetACLResponse*); +struct WatcherEvent { + int32_t type; + int32_t state; + char * path; +}; +int serialize_WatcherEvent(struct oarchive *out, const char *tag, struct WatcherEvent *v); +int deserialize_WatcherEvent(struct iarchive *in, const char *tag, struct WatcherEvent*v); +void deallocate_WatcherEvent(struct WatcherEvent*); +struct CreateResponse { + char * path; +}; +int serialize_CreateResponse(struct oarchive *out, const char *tag, struct CreateResponse *v); +int deserialize_CreateResponse(struct iarchive *in, const char *tag, struct CreateResponse*v); +void deallocate_CreateResponse(struct CreateResponse*); +struct ExistsRequest { + char * path; + int32_t watch; +}; +int serialize_ExistsRequest(struct oarchive *out, const char *tag, struct ExistsRequest *v); +int deserialize_ExistsRequest(struct iarchive *in, const char *tag, struct ExistsRequest*v); +void deallocate_ExistsRequest(struct ExistsRequest*); +struct ExistsResponse { + struct Stat stat; +}; +int serialize_ExistsResponse(struct oarchive *out, const char *tag, struct ExistsResponse *v); +int deserialize_ExistsResponse(struct iarchive *in, const char *tag, struct ExistsResponse*v); +void deallocate_ExistsResponse(struct ExistsResponse*); +struct GetDataResponse { + struct buffer data; + struct Stat stat; +}; +int serialize_GetDataResponse(struct oarchive *out, const char *tag, struct GetDataResponse *v); +int deserialize_GetDataResponse(struct iarchive *in, const char *tag, struct GetDataResponse*v); +void deallocate_GetDataResponse(struct GetDataResponse*); +struct GetChildrenResponse { + struct String_vector children; +}; +int serialize_GetChildrenResponse(struct oarchive *out, const char *tag, struct GetChildrenResponse *v); +int deserialize_GetChildrenResponse(struct iarchive *in, const char *tag, struct GetChildrenResponse*v); +void deallocate_GetChildrenResponse(struct GetChildrenResponse*); +struct GetACLResponse { + struct ACL_vector acl; + struct Stat stat; +}; +int serialize_GetACLResponse(struct oarchive *out, const char *tag, struct GetACLResponse *v); +int deserialize_GetACLResponse(struct iarchive *in, const char *tag, struct GetACLResponse*v); +void deallocate_GetACLResponse(struct GetACLResponse*); +struct Id_vector { + int32_t count; + struct Id *data; +; +}; +int serialize_Id_vector(struct oarchive *out, const char *tag, struct Id_vector *v); +int deserialize_Id_vector(struct iarchive *in, const char *tag, struct Id_vector *v); +int allocate_Id_vector(struct Id_vector *v, int32_t len); +int deallocate_Id_vector(struct Id_vector *v); +struct QuorumPacket { + int32_t type; + int64_t zxid; + struct buffer data; + struct Id_vector authinfo; +}; +int serialize_QuorumPacket(struct oarchive *out, const char *tag, struct QuorumPacket *v); +int deserialize_QuorumPacket(struct iarchive *in, const char *tag, struct QuorumPacket*v); +void deallocate_QuorumPacket(struct QuorumPacket*); +struct FileHeader { + int32_t magic; + int32_t version; + int64_t dbid; +}; +int serialize_FileHeader(struct oarchive *out, const char *tag, struct FileHeader *v); +int deserialize_FileHeader(struct iarchive *in, const char *tag, struct FileHeader*v); +void deallocate_FileHeader(struct FileHeader*); +struct TxnHeader { + int64_t clientId; + int32_t cxid; + int64_t zxid; + int64_t time; + int32_t type; +}; +int serialize_TxnHeader(struct oarchive *out, const char *tag, struct TxnHeader *v); +int deserialize_TxnHeader(struct iarchive *in, const char *tag, struct TxnHeader*v); +void deallocate_TxnHeader(struct TxnHeader*); +struct CreateTxn { + char * path; + struct buffer data; + struct ACL_vector acl; + int32_t ephemeral; +}; +int serialize_CreateTxn(struct oarchive *out, const char *tag, struct CreateTxn *v); +int deserialize_CreateTxn(struct iarchive *in, const char *tag, struct CreateTxn*v); +void deallocate_CreateTxn(struct CreateTxn*); +struct DeleteTxn { + char * path; +}; +int serialize_DeleteTxn(struct oarchive *out, const char *tag, struct DeleteTxn *v); +int deserialize_DeleteTxn(struct iarchive *in, const char *tag, struct DeleteTxn*v); +void deallocate_DeleteTxn(struct DeleteTxn*); +struct SetDataTxn { + char * path; + struct buffer data; + int32_t version; +}; +int serialize_SetDataTxn(struct oarchive *out, const char *tag, struct SetDataTxn *v); +int deserialize_SetDataTxn(struct iarchive *in, const char *tag, struct SetDataTxn*v); +void deallocate_SetDataTxn(struct SetDataTxn*); +struct SetACLTxn { + char * path; + struct ACL_vector acl; + int32_t version; +}; +int serialize_SetACLTxn(struct oarchive *out, const char *tag, struct SetACLTxn *v); +int deserialize_SetACLTxn(struct iarchive *in, const char *tag, struct SetACLTxn*v); +void deallocate_SetACLTxn(struct SetACLTxn*); +struct SetMaxChildrenTxn { + char * path; + int32_t max; +}; +int serialize_SetMaxChildrenTxn(struct oarchive *out, const char *tag, struct SetMaxChildrenTxn *v); +int deserialize_SetMaxChildrenTxn(struct iarchive *in, const char *tag, struct SetMaxChildrenTxn*v); +void deallocate_SetMaxChildrenTxn(struct SetMaxChildrenTxn*); +struct CreateSessionTxn { + int32_t timeOut; +}; +int serialize_CreateSessionTxn(struct oarchive *out, const char *tag, struct CreateSessionTxn *v); +int deserialize_CreateSessionTxn(struct iarchive *in, const char *tag, struct CreateSessionTxn*v); +void deallocate_CreateSessionTxn(struct CreateSessionTxn*); +struct ErrorTxn { + int32_t err; +}; +int serialize_ErrorTxn(struct oarchive *out, const char *tag, struct ErrorTxn *v); +int deserialize_ErrorTxn(struct iarchive *in, const char *tag, struct ErrorTxn*v); +void deallocate_ErrorTxn(struct ErrorTxn*); + +#ifdef __cplusplus +} +#endif + +#endif //ZOOKEEPER_JUTE__ diff --git a/include/zookeeper/zookeeper_log.h b/include/zookeeper/zookeeper_log.h new file mode 100644 index 0000000..e5917cb --- /dev/null +++ b/include/zookeeper/zookeeper_log.h @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ZK_LOG_H_ +#define ZK_LOG_H_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +extern ZOOAPI ZooLogLevel logLevel; +#define LOGSTREAM getLogStream() + +#define LOG_ERROR(x) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \ + log_message(ZOO_LOG_LEVEL_ERROR,__LINE__,__func__,format_log_message x) +#define LOG_WARN(x) if(logLevel>=ZOO_LOG_LEVEL_WARN) \ + log_message(ZOO_LOG_LEVEL_WARN,__LINE__,__func__,format_log_message x) +#define LOG_INFO(x) if(logLevel>=ZOO_LOG_LEVEL_INFO) \ + log_message(ZOO_LOG_LEVEL_INFO,__LINE__,__func__,format_log_message x) +#define LOG_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \ + log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x) + +ZOOAPI void log_message(ZooLogLevel curLevel, int line,const char* funcName, + const char* message); + +ZOOAPI const char* format_log_message(const char* format,...); + +FILE* getLogStream(); + +#ifdef __cplusplus +} +#endif + +#endif /*ZK_LOG_H_*/ diff --git a/include/zookeeper/zookeeper_version.h b/include/zookeeper/zookeeper_version.h new file mode 100644 index 0000000..b0f88bc --- /dev/null +++ b/include/zookeeper/zookeeper_version.h @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef ZOOKEEPER_VERSION_H_ +#define ZOOKEEPER_VERSION_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define ZOO_MAJOR_VERSION 3 +#define ZOO_MINOR_VERSION 2 +#define ZOO_PATCH_VERSION 1 + +#ifdef __cplusplus +} +#endif + +#endif /* ZOOKEEPER_VERSION_H_ */ diff --git a/ulogd.conf.in b/ulogd.conf.in index f7bce51..4e44de0 100644 --- a/ulogd.conf.in +++ b/ulogd.conf.in @@ -57,23 +57,20 @@ plugin="@libdir@/ulogd_DRL.so" # It will never go above this rate, even if capacity is # available. A value of 0 means unlimited. nodelimit=0 - -# Leave this at FPS right now... +# DRL allocation policy: FPS for flow proportional share +# or GRD for global random drop. policy=FPS - -# Smallest interval at which identities are processed. +# estintms is the system tick time (estimate interval), +# in milliseconds. estintms=500 - -# Log file location. -drl_logfile="/var/log/drl.log" - +# The location of the DRL logfile. +drl_logfile="/var/log/ulogd-drl.log" # The verbosity of the DRL logfile. 1 = most verbose, # 3 = errors only. -drl_loglevel 2 - -# Location of the DRL xml configuration file. -drl_configfile="/etc/drl.xml" - +drl_loglevel=3 +# The location of the DRL configuration file, which +# specifies which identities should be installed. +drl_configfile="@etcdir@/drl.xml" [NETFLOW] # PlanetLab NetFlow logging diff --git a/ulogd.spec b/ulogd.spec index 7ab2204..c4df9fc 100644 --- a/ulogd.spec +++ b/ulogd.spec @@ -99,6 +99,7 @@ rm -rf %{buildroot} %attr(0755,root,root) %{_sbindir}/ulogd #%attr(0755,root,root) %{_bindir}/netflow-import %{_sysconfdir}/ulogd.conf +%{_sysconfdir}/drl.xml %{_sysconfdir}/logrotate.d/ulogd %attr(0755,root,root) %{_sysconfdir}/rc.d/init.d/ulogd %{_mandir}/man8/* -- 2.43.0