-/* Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
struct json_array *params,
const struct json *request_id);
static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
+static size_t ovsdb_jsonrpc_monitor_json_length_all(
+ struct ovsdb_jsonrpc_session *);
\f
/* JSON-RPC database server. */
bool
ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
{
+ /* The OVSDB protocol doesn't have a way to notify a client that a
+ * database has been added. If some client tried to use the database
+ * that we're adding and failed, then forcing it to reconnect seems like
+ * a reasonable way to make it try again.
+ *
+ * If this is too big of a hammer in practice, we could be more selective,
+ * e.g. disconnect only connections that actually tried to use a database
+ * with 'db''s name. */
+ ovsdb_jsonrpc_server_reconnect(svr);
+
return ovsdb_server_add_db(&svr->up, db);
}
+/* Removes 'db' from the set of databases served out by 'svr'. Returns
+ * true if successful, false if there is no database associated with 'db'. */
+bool
+ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
+ struct ovsdb *db)
+{
+ /* There might be pointers to 'db' from 'svr', such as monitors or
+ * outstanding transactions. Disconnect all JSON-RPC connections to avoid
+ * accesses to freed memory.
+ *
+ * If this is too big of a hammer in practice, we could be more selective,
+ * e.g. disconnect only connections that actually reference 'db'. */
+ ovsdb_jsonrpc_server_reconnect(svr);
+
+ return ovsdb_server_remove_db(&svr->up, db);
+}
+
void
ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
{
error = jsonrpc_pstream_open(name, &listener, options->dscp);
if (error && error != EAFNOSUPPORT) {
- VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
+ VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
return NULL;
}
shash_add(&svr->remotes, name, remote);
if (!listener) {
- ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
+ ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true));
}
return remote;
}
} else if (error != EAGAIN) {
VLOG_WARN_RL(&rl, "%s: accept failed: %s",
pstream_get_name(remote->listener),
- strerror(error));
+ ovs_strerror(error));
}
}
struct list node; /* Element in remote's sessions list. */
struct ovsdb_session up;
struct ovsdb_jsonrpc_remote *remote;
+ size_t backlog_threshold; /* See ovsdb_jsonrpc_session_run(). */
+ size_t reply_backlog;
/* Triggers. */
struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
list_push_back(&remote->sessions, &s->node);
hmap_init(&s->triggers);
hmap_init(&s->monitors);
+ s->reply_backlog = 0;
+ s->backlog_threshold = 1024 * 1024;
s->js = js;
s->js_seqno = jsonrpc_session_get_seqno(js);
{
ovsdb_jsonrpc_monitor_remove_all(s);
ovsdb_jsonrpc_session_unlock_all(s);
+ ovsdb_jsonrpc_trigger_complete_all(s);
+
+ hmap_destroy(&s->monitors);
+ hmap_destroy(&s->triggers);
+
jsonrpc_session_close(s->js);
list_remove(&s->node);
- ovsdb_session_destroy(&s->up);
s->remote->server->n_sessions--;
ovsdb_session_destroy(&s->up);
free(s);
static int
ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
{
+ size_t backlog;
+
jsonrpc_session_run(s->js);
if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
s->js_seqno = jsonrpc_session_get_seqno(s->js);
ovsdb_jsonrpc_trigger_complete_done(s);
- if (!jsonrpc_session_get_backlog(s->js)) {
+ backlog = jsonrpc_session_get_backlog(s->js);
+ if (!backlog) {
struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
if (msg) {
if (msg->type == JSONRPC_REQUEST) {
jsonrpc_msg_destroy(msg);
}
}
+ s->reply_backlog = jsonrpc_session_get_backlog(s->js);
+ } else if (backlog > s->reply_backlog + s->backlog_threshold) {
+ /* We have a lot of data queued to send to the client. The data is
+ * likely to be mostly monitor updates. It is unlikely that the
+ * monitor updates are due to transactions by 's', because we will not
+ * let 's' make any more transactions until it drains its backlog to 0
+ * (see previous 'if' case). So the monitor updates are probably due
+ * to transactions made by database clients other than 's'. We can't
+ * fix that by preventing 's' from executing more transactions. We
+ * could fix it by preventing every client from executing transactions,
+ * but then one slow or hung client could prevent other clients from
+ * doing useful work.
+ *
+ * Our solution is to cap the maximum backlog to O(1) in the amount of
+ * data in the database. If the backlog exceeds that amount, then we
+ * disconnect the client. When it reconnects, it can fetch the entire
+ * contents of the database using less data than was previously
+ * backlogged. */
+ size_t monitor_length;
+
+ monitor_length = ovsdb_jsonrpc_monitor_json_length_all(s);
+ if (backlog > s->reply_backlog + monitor_length * 2) {
+ VLOG_INFO("%s: %"PRIuSIZE" bytes backlogged but a complete replica "
+ "would only take %"PRIuSIZE" bytes, disconnecting",
+ jsonrpc_session_get_name(s->js),
+ backlog - s->reply_backlog, monitor_length);
+ jsonrpc_session_force_reconnect(s->js);
+ } else {
+ /* The backlog is not unreasonably big. Only check again after it
+ * becomes much bigger. */
+ s->backlog_threshold = 2 * MAX(s->backlog_threshold * 2,
+ monitor_length);
+ }
}
return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
}
error = pstream_set_dscp(remote->listener, options->dscp);
if (error) {
VLOG_ERR("%s: set_dscp failed %s",
- pstream_get_name(remote->listener), strerror(error));
+ pstream_get_name(remote->listener), ovs_strerror(error));
} else {
remote->dscp = options->dscp;
}
struct reconnect_stats rstats;
struct ds locks_held, locks_waiting, locks_lost;
+ status->bound_port = (remote->listener
+ ? pstream_get_bound_port(remote->listener)
+ : htons(0));
+
if (list_is_empty(&remote->sessions)) {
return false;
}
static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
static struct json *ovsdb_jsonrpc_monitor_get_initial(
const struct ovsdb_jsonrpc_monitor *);
+static size_t ovsdb_jsonrpc_monitor_json_length(
+ const struct ovsdb_jsonrpc_monitor *);
static bool
parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
}
}
+/* Returns an overestimate of the number of bytes of JSON data required to
+ * report the current contents of the database over all the monitors currently
+ * configured in 's'. */
+static size_t
+ovsdb_jsonrpc_monitor_json_length_all(struct ovsdb_jsonrpc_session *s)
+{
+ struct ovsdb_jsonrpc_monitor *m;
+ size_t length;
+
+ length = 0;
+ HMAP_FOR_EACH (m, node, &s->monitors) {
+ length += ovsdb_jsonrpc_monitor_json_length(m);
+ }
+ return length;
+}
+
static struct ovsdb_jsonrpc_monitor *
ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
{
return true;
}
+/* Returns an overestimate of the number of bytes of JSON data required to
+ * report the current contents of the database over monitor 'm'. */
+static size_t
+ovsdb_jsonrpc_monitor_json_length(const struct ovsdb_jsonrpc_monitor *m)
+{
+ const struct shash_node *node;
+ size_t length;
+
+ /* Top-level overhead of monitor JSON. */
+ length = 256;
+
+ SHASH_FOR_EACH (node, &m->tables) {
+ const struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+ const struct ovsdb_table *table = mt->table;
+ const struct ovsdb_row *row;
+ size_t i;
+
+ /* Per-table JSON overhead: "<table>":{...}. */
+ length += strlen(table->schema->name) + 32;
+
+ /* Per-row JSON overhead: ,"<uuid>":{"old":{...},"new":{...}} */
+ length += hmap_count(&table->rows) * (UUID_LEN + 32);
+
+ /* Per-row, per-column JSON overhead: ,"<column>": */
+ for (i = 0; i < mt->n_columns; i++) {
+ const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
+ const struct ovsdb_column *column = c->column;
+
+ length += hmap_count(&table->rows) * (8 + strlen(column->name));
+ }
+
+ /* Data. */
+ HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+ for (i = 0; i < mt->n_columns; i++) {
+ const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
+ const struct ovsdb_column *column = c->column;
+
+ length += ovsdb_datum_json_length(&row->fields[column->index],
+ &column->type);
+ }
+ }
+ }
+
+ return length;
+}
+
static void
ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
const struct ovsdb_jsonrpc_monitor *m,