jsonrpc-server: Disconnect connections that queue too much data.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
index 6e0a6f6..febc351 100644 (file)
@@ -82,6 +82,8 @@ static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
     struct json_array *params,
     const struct json *request_id);
 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
+static size_t ovsdb_jsonrpc_monitor_json_length_all(
+    struct ovsdb_jsonrpc_session *);
 \f
 /* JSON-RPC database server. */
 
@@ -335,6 +337,8 @@ struct ovsdb_jsonrpc_session {
     struct list node;           /* Element in remote's sessions list. */
     struct ovsdb_session up;
     struct ovsdb_jsonrpc_remote *remote;
+    size_t backlog_threshold;   /* See ovsdb_jsonrpc_session_run(). */
+    size_t reply_backlog;
 
     /* Triggers. */
     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
@@ -371,6 +375,8 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
     list_push_back(&remote->sessions, &s->node);
     hmap_init(&s->triggers);
     hmap_init(&s->monitors);
+    s->reply_backlog = 0;
+    s->backlog_threshold = 1024 * 1024;
     s->js = js;
     s->js_seqno = jsonrpc_session_get_seqno(js);
 
@@ -399,6 +405,8 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
 static int
 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
 {
+    size_t backlog;
+
     jsonrpc_session_run(s->js);
     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
         s->js_seqno = jsonrpc_session_get_seqno(s->js);
@@ -409,7 +417,8 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
 
     ovsdb_jsonrpc_trigger_complete_done(s);
 
-    if (!jsonrpc_session_get_backlog(s->js)) {
+    backlog = jsonrpc_session_get_backlog(s->js);
+    if (!backlog) {
         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
         if (msg) {
             if (msg->type == JSONRPC_REQUEST) {
@@ -424,6 +433,39 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
                 jsonrpc_msg_destroy(msg);
             }
         }
+        s->reply_backlog = jsonrpc_session_get_backlog(s->js);
+    } else if (backlog > s->reply_backlog + s->backlog_threshold) {
+        /* We have a lot of data queued to send to the client.  The data is
+         * likely to be mostly monitor updates.  It is unlikely that the
+         * monitor updates are due to transactions by 's', because we will not
+         * let 's' make any more transactions until it drains its backlog to 0
+         * (see previous 'if' case).  So the monitor updates are probably due
+         * to transactions made by database clients other than 's'.  We can't
+         * fix that by preventing 's' from executing more transactions.  We
+         * could fix it by preventing every client from executing transactions,
+         * but then one slow or hung client could prevent other clients from
+         * doing useful work.
+         *
+         * Our solution is to cap the maximum backlog to O(1) in the amount of
+         * data in the database.  If the backlog exceeds that amount, then we
+         * disconnect the client.  When it reconnects, it can fetch the entire
+         * contents of the database using less data than was previously
+         * backlogged. */
+        size_t monitor_length;
+
+        monitor_length = ovsdb_jsonrpc_monitor_json_length_all(s);
+        if (backlog > s->reply_backlog + monitor_length * 2) {
+            VLOG_INFO("%s: %zu bytes backlogged but a complete replica "
+                      "would only take %zu bytes, disconnecting",
+                      jsonrpc_session_get_name(s->js),
+                      backlog - s->reply_backlog, monitor_length);
+            jsonrpc_session_force_reconnect(s->js);
+        } else {
+            /* The backlog is not unreasonably big.  Only check again after it
+             * becomes much bigger. */
+            s->backlog_threshold = 2 * MAX(s->backlog_threshold * 2,
+                                           monitor_length);
+        }
     }
     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
 }
@@ -1023,6 +1065,8 @@ struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
 static struct json *ovsdb_jsonrpc_monitor_get_initial(
     const struct ovsdb_jsonrpc_monitor *);
+static size_t ovsdb_jsonrpc_monitor_json_length(
+    const struct ovsdb_jsonrpc_monitor *);
 
 static bool
 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
@@ -1292,6 +1336,22 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
     }
 }
 
+/* Returns an overestimate of the number of bytes of JSON data required to
+ * report the current contents of the database over all the monitors currently
+ * configured in 's'.  */
+static size_t
+ovsdb_jsonrpc_monitor_json_length_all(struct ovsdb_jsonrpc_session *s)
+{
+    struct ovsdb_jsonrpc_monitor *m;
+    size_t length;
+
+    length = 0;
+    HMAP_FOR_EACH (m, node, &s->monitors) {
+        length += ovsdb_jsonrpc_monitor_json_length(m);
+    }
+    return length;
+}
+
 static struct ovsdb_jsonrpc_monitor *
 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
 {
@@ -1427,6 +1487,52 @@ ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
     return true;
 }
 
+/* Returns an overestimate of the number of bytes of JSON data required to
+ * report the current contents of the database over monitor 'm'. */
+static size_t
+ovsdb_jsonrpc_monitor_json_length(const struct ovsdb_jsonrpc_monitor *m)
+{
+    const struct shash_node *node;
+    size_t length;
+
+    /* Top-level overhead of monitor JSON. */
+    length = 256;
+
+    SHASH_FOR_EACH (node, &m->tables) {
+        const struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+        const struct ovsdb_table *table = mt->table;
+        const struct ovsdb_row *row;
+        size_t i;
+
+        /* Per-table JSON overhead: "<table>":{...}. */
+        length += strlen(table->schema->name) + 32;
+
+        /* Per-row JSON overhead: ,"<uuid>":{"old":{...},"new":{...}} */
+        length += hmap_count(&table->rows) * (UUID_LEN + 32);
+
+        /* Per-row, per-column JSON overhead: ,"<column>": */
+        for (i = 0; i < mt->n_columns; i++) {
+            const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
+            const struct ovsdb_column *column = c->column;
+
+            length += hmap_count(&table->rows) * (8 + strlen(column->name));
+        }
+
+        /* Data. */
+        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+            for (i = 0; i < mt->n_columns; i++) {
+                const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
+                const struct ovsdb_column *column = c->column;
+
+                length += ovsdb_datum_json_length(&row->fields[column->index],
+                                                  &column->type);
+            }
+        }
+    }
+
+    return length;
+}
+
 static void
 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
                                const struct ovsdb_jsonrpc_monitor *m,