Catalli's threaded switch
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
index 26fabe5..71a4489 100644 (file)
@@ -20,6 +20,7 @@
 #include <assert.h>
 #include <errno.h>
 
+#include "bitmap.h"
 #include "column.h"
 #include "json.h"
 #include "jsonrpc.h"
 #include "timeval.h"
 #include "transaction.h"
 #include "trigger.h"
-
-#define THIS_MODULE VLM_ovsdb_jsonrpc_server
 #include "vlog.h"
 
+VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server)
+
 struct ovsdb_jsonrpc_remote;
 struct ovsdb_jsonrpc_session;
 
@@ -49,6 +50,7 @@ static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
+static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
 
 /* Triggers. */
 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
@@ -143,7 +145,7 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
     struct pstream *listener;
     int error;
 
-    error = pstream_open(name, &listener);
+    error = jsonrpc_pstream_open(name, &listener);
     if (error && error != EAFNOSUPPORT) {
         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
         return;
@@ -171,6 +173,20 @@ ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
     free(remote);
 }
 
+/* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
+ * reconnect. */
+void
+ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
+{
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &svr->remotes) {
+        struct ovsdb_jsonrpc_remote *remote = node->data;
+
+        ovsdb_jsonrpc_session_reconnect_all(remote);
+    }
+}
+
 void
 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
 {
@@ -346,6 +362,22 @@ ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
     }
 }
 
+/* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
+ * reconnect. */
+static void
+ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
+{
+    struct ovsdb_jsonrpc_session *s, *next;
+
+    LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
+                        &remote->sessions) {
+        jsonrpc_session_force_reconnect(s->js);
+        if (!jsonrpc_session_is_alive(s->js)) {
+            ovsdb_jsonrpc_session_close(s);
+        }
+    }
+}
+
 static const char *
 get_db_name(const struct ovsdb_jsonrpc_session *s)
 {
@@ -587,12 +619,26 @@ enum ovsdb_jsonrpc_monitor_selection {
     OJMS_MODIFY = 1 << 3        /* Modified rows. */
 };
 
+/* A particular column being monitored. */
+struct ovsdb_jsonrpc_monitor_column {
+    const struct ovsdb_column *column;
+    enum ovsdb_jsonrpc_monitor_selection select;
+};
+
+/* A particular table being monitored. */
 struct ovsdb_jsonrpc_monitor_table {
     const struct ovsdb_table *table;
+
+    /* This is the union (bitwise-OR) of the 'select' values in all of the
+     * members of 'columns' below. */
     enum ovsdb_jsonrpc_monitor_selection select;
-    struct ovsdb_column_set columns;
+
+    /* Columns being monitored. */
+    struct ovsdb_jsonrpc_monitor_column *columns;
+    size_t n_columns;
 };
 
+/* A collection of tables being monitored. */
 struct ovsdb_jsonrpc_monitor {
     struct ovsdb_replica replica;
     struct ovsdb_jsonrpc_session *session;
@@ -635,6 +681,118 @@ ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
     return NULL;
 }
 
+static void
+ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
+                                 const struct ovsdb_column *column,
+                                 enum ovsdb_jsonrpc_monitor_selection select,
+                                 size_t *allocated_columns)
+{
+    struct ovsdb_jsonrpc_monitor_column *c;
+
+    if (mt->n_columns >= *allocated_columns) {
+        mt->columns = x2nrealloc(mt->columns, allocated_columns,
+                                 sizeof *mt->columns);
+    }
+
+    c = &mt->columns[mt->n_columns++];
+    c->column = column;
+    c->select = select;
+}
+
+static int
+compare_ovsdb_jsonrpc_monitor_column(const void *a_, const void *b_)
+{
+    const struct ovsdb_jsonrpc_monitor_column *a = a_;
+    const struct ovsdb_jsonrpc_monitor_column *b = b_;
+
+    return a->column < b->column ? -1 : a->column > b->column;
+}
+
+static struct ovsdb_error * WARN_UNUSED_RESULT
+ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
+                                    const struct json *monitor_request,
+                                    size_t *allocated_columns)
+{
+    const struct ovsdb_table_schema *ts = mt->table->schema;
+    enum ovsdb_jsonrpc_monitor_selection select;
+    const struct json *columns, *select_json;
+    struct ovsdb_parser parser;
+    struct ovsdb_error *error;
+
+    ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
+    columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+    select_json = ovsdb_parser_member(&parser, "select",
+                                      OP_OBJECT | OP_OPTIONAL);
+    error = ovsdb_parser_finish(&parser);
+    if (error) {
+        return error;
+    }
+
+    if (select_json) {
+        select = 0;
+        ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
+        if (parse_bool(&parser, "initial", true)) {
+            select |= OJMS_INITIAL;
+        }
+        if (parse_bool(&parser, "insert", true)) {
+            select |= OJMS_INSERT;
+        }
+        if (parse_bool(&parser, "delete", true)) {
+            select |= OJMS_DELETE;
+        }
+        if (parse_bool(&parser, "modify", true)) {
+            select |= OJMS_MODIFY;
+        }
+        error = ovsdb_parser_finish(&parser);
+        if (error) {
+            return error;
+        }
+    } else {
+        select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
+    }
+    mt->select |= select;
+
+    if (columns) {
+        size_t i;
+
+        if (columns->type != JSON_ARRAY) {
+            return ovsdb_syntax_error(columns, NULL,
+                                      "array of column names expected");
+        }
+
+        for (i = 0; i < columns->u.array.n; i++) {
+            const struct ovsdb_column *column;
+            const char *s;
+
+            if (columns->u.array.elems[i]->type != JSON_STRING) {
+                return ovsdb_syntax_error(columns, NULL,
+                                          "array of column names expected");
+            }
+
+            s = columns->u.array.elems[i]->u.string;
+            column = shash_find_data(&mt->table->schema->columns, s);
+            if (!column) {
+                return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
+                                          "column name", s);
+            }
+            ovsdb_jsonrpc_add_monitor_column(mt, column, select,
+                                             allocated_columns);
+        }
+    } else {
+        struct shash_node *node;
+
+        SHASH_FOR_EACH (node, &ts->columns) {
+            const struct ovsdb_column *column = node->data;
+            if (column->index != OVSDB_COL_UUID) {
+                ovsdb_jsonrpc_add_monitor_column(mt, column, select,
+                                                 allocated_columns);
+            }
+        }
+    }
+
+    return NULL;
+}
+
 static struct json *
 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
                              struct json *params)
@@ -673,8 +831,9 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
         const struct ovsdb_table *table;
         struct ovsdb_jsonrpc_monitor_table *mt;
-        const struct json *columns_json, *select_json;
-        struct ovsdb_parser parser;
+        size_t allocated_columns;
+        const struct json *mr_value;
+        size_t i;
 
         table = ovsdb_get_table(s->remote->server->db, node->name);
         if (!table) {
@@ -685,55 +844,37 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
 
         mt = xzalloc(sizeof *mt);
         mt->table = table;
-        mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
-        ovsdb_column_set_init(&mt->columns);
         shash_add(&m->tables, table->schema->name, mt);
 
-        ovsdb_parser_init(&parser, node->data, "table %s", node->name);
-        columns_json = ovsdb_parser_member(&parser, "columns",
-                                           OP_ARRAY | OP_OPTIONAL);
-        select_json = ovsdb_parser_member(&parser, "select",
-                                          OP_OBJECT | OP_OPTIONAL);
-        error = ovsdb_parser_finish(&parser);
-        if (error) {
-            goto error;
-        }
-
-        if (columns_json) {
-            error = ovsdb_column_set_from_json(columns_json, table,
-                                               &mt->columns);
-            if (error) {
-                goto error;
+        /* Parse columns. */
+        mr_value = node->data;
+        allocated_columns = 0;
+        if (mr_value->type == JSON_ARRAY) {
+            const struct json_array *array = &mr_value->u.array;
+
+            for (i = 0; i < array->n; i++) {
+                error = ovsdb_jsonrpc_parse_monitor_request(
+                    mt, array->elems[i], &allocated_columns);
+                if (error) {
+                    goto error;
+                }
             }
         } else {
-            struct shash_node *node;
-
-            SHASH_FOR_EACH (node, &table->schema->columns) {
-                const struct ovsdb_column *column = node->data;
-                if (column->index != OVSDB_COL_UUID) {
-                    ovsdb_column_set_add(&mt->columns, column);
-                }
+            error = ovsdb_jsonrpc_parse_monitor_request(
+                mt, mr_value, &allocated_columns);
+            if (error) {
+                goto error;
             }
         }
 
-        if (select_json) {
-            mt->select = 0;
-            ovsdb_parser_init(&parser, select_json, "table %s select",
-                              table->schema->name);
-            if (parse_bool(&parser, "initial", true)) {
-                mt->select |= OJMS_INITIAL;
-            }
-            if (parse_bool(&parser, "insert", true)) {
-                mt->select |= OJMS_INSERT;
-            }
-            if (parse_bool(&parser, "delete", true)) {
-                mt->select |= OJMS_DELETE;
-            }
-            if (parse_bool(&parser, "modify", true)) {
-                mt->select |= OJMS_MODIFY;
-            }
-            error = ovsdb_parser_finish(&parser);
-            if (error) {
+        /* Check for duplicate columns. */
+        qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
+              compare_ovsdb_jsonrpc_monitor_column);
+        for (i = 1; i < mt->n_columns; i++) {
+            if (mt->columns[i].column == mt->columns[i - 1].column) {
+                error = ovsdb_syntax_error(mr_value, NULL, "column %s "
+                                           "mentioned more than once",
+                                           mt->columns[i].column->name);
                 goto error;
             }
         }
@@ -804,6 +945,7 @@ struct ovsdb_jsonrpc_monitor_aux {
 static bool
 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
                                 const struct ovsdb_row *new,
+                                const unsigned long int *changed,
                                 void *aux_)
 {
     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
@@ -838,17 +980,23 @@ ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
 
     old_json = new_json = NULL;
     n_changed = 0;
-    for (i = 0; i < aux->mt->columns.n_columns; i++) {
-        const struct ovsdb_column *column = aux->mt->columns.columns[i];
-        unsigned int idx = column->index;
-        bool changed = false;
+    for (i = 0; i < aux->mt->n_columns; i++) {
+        const struct ovsdb_jsonrpc_monitor_column *c = &aux->mt->columns[i];
+        const struct ovsdb_column *column = c->column;
+        unsigned int idx = c->column->index;
+        bool column_changed = false;
+
+        if (!(type & c->select)) {
+            /* We don't care about this type of change for this particular
+             * column (but we will care about it for some other column). */
+            continue;
+        }
 
         if (type == OJMS_MODIFY) {
-            changed = !ovsdb_datum_equals(&old->fields[idx],
-                                          &new->fields[idx], &column->type);
-            n_changed += changed;
+            column_changed = bitmap_is_set(changed, idx);
+            n_changed += column_changed;
         }
-        if (changed || type == OJMS_DELETE) {
+        if (column_changed || type == OJMS_DELETE) {
             if (!old_json) {
                 old_json = json_object_create();
             }
@@ -915,7 +1063,8 @@ ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
 
 static struct ovsdb_error *
 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
-                             const struct ovsdb_txn *txn, bool durable UNUSED)
+                             const struct ovsdb_txn *txn,
+                             bool durable OVS_UNUSED)
 {
     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
     struct ovsdb_jsonrpc_monitor_aux aux;
@@ -950,7 +1099,7 @@ ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
 
             HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
                            &mt->table->rows) {
-                ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
+                ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
             }
         }
     }
@@ -966,7 +1115,7 @@ ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
     json_destroy(m->monitor_id);
     SHASH_FOR_EACH (node, &m->tables) {
         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
-        ovsdb_column_set_destroy(&mt->columns);
+        free(mt->columns);
         free(mt);
     }
     shash_destroy(&m->tables);