ovsdb: Add support for weak references.
[sliver-openvswitch.git] / ovsdb / transaction.c
index 8a10d1e..2e4c73a 100644 (file)
@@ -160,7 +160,7 @@ ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn,
     const struct ovsdb_table *table;
     unsigned int i;
 
-    if (base->type != OVSDB_TYPE_UUID || !base->u.uuid.refTable) {
+    if (!ovsdb_base_type_is_strong_ref(base)) {
         return NULL;
     }
 
@@ -270,6 +270,142 @@ ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
     return NULL;
 }
 
+static void
+add_weak_ref(struct ovsdb_txn *txn,
+             const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
+{
+    struct ovsdb_row *src = (struct ovsdb_row *) src_;
+    struct ovsdb_row *dst = (struct ovsdb_row *) dst_;
+    struct ovsdb_weak_ref *weak;
+
+    if (src == dst) {
+        return;
+    }
+
+    dst = ovsdb_txn_row_modify(txn, dst);
+
+    if (!list_is_empty(&dst->dst_refs)) {
+        /* Omit duplicates. */
+        weak = CONTAINER_OF(list_back(&dst->dst_refs),
+                            struct ovsdb_weak_ref, dst_node);
+        if (weak->src == src) {
+            return;
+        }
+    }
+
+    weak = xmalloc(sizeof *weak);
+    weak->src = src;
+    list_push_back(&dst->dst_refs, &weak->dst_node);
+    list_push_back(&src->src_refs, &weak->src_node);
+}
+
+static struct ovsdb_error * WARN_UNUSED_RESULT
+assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
+{
+    struct ovsdb_table *table;
+    struct shash_node *node;
+
+    if (txn_row->old) {
+        /* Mark rows that have weak references to 'txn_row' as modified, so
+         * that their weak references will get reassessed. */
+        struct ovsdb_weak_ref *weak, *next;
+
+        LIST_FOR_EACH_SAFE (weak, next, struct ovsdb_weak_ref, dst_node,
+                            &txn_row->old->dst_refs) {
+            if (!weak->src->txn_row) {
+                ovsdb_txn_row_modify(txn, weak->src);
+            }
+        }
+    }
+
+    if (!txn_row->new) {
+        /* We don't have to do anything about references that originate at
+         * 'txn_row', because ovsdb_row_destroy() will remove those weak
+         * references. */
+        return NULL;
+    }
+
+    table = txn_row->new->table;
+    SHASH_FOR_EACH (node, &table->schema->columns) {
+        const struct ovsdb_column *column = node->data;
+        struct ovsdb_datum *datum = &txn_row->new->fields[column->index];
+        unsigned int orig_n, i;
+        bool zero = false;
+
+        orig_n = datum->n;
+
+        if (ovsdb_base_type_is_weak_ref(&column->type.key)) {
+            for (i = 0; i < datum->n; ) {
+                const struct ovsdb_row *row;
+
+                row = ovsdb_table_get_row(column->type.key.u.uuid.refTable,
+                                          &datum->keys[i].uuid);
+                if (row) {
+                    add_weak_ref(txn, txn_row->new, row);
+                    i++;
+                } else {
+                    if (uuid_is_zero(&datum->keys[i].uuid)) {
+                        zero = true;
+                    }
+                    ovsdb_datum_remove_unsafe(datum, i, &column->type);
+                }
+            }
+        }
+
+        if (ovsdb_base_type_is_weak_ref(&column->type.value)) {
+            for (i = 0; i < datum->n; ) {
+                const struct ovsdb_row *row;
+
+                row = ovsdb_table_get_row(column->type.value.u.uuid.refTable,
+                                          &datum->values[i].uuid);
+                if (row) {
+                    add_weak_ref(txn, txn_row->new, row);
+                    i++;
+                } else {
+                    if (uuid_is_zero(&datum->values[i].uuid)) {
+                        zero = true;
+                    }
+                    ovsdb_datum_remove_unsafe(datum, i, &column->type);
+                }
+            }
+        }
+
+        if (datum->n != orig_n) {
+            bitmap_set1(txn_row->changed, column->index);
+            ovsdb_datum_sort_assert(datum, column->type.key.type);
+            if (datum->n < column->type.n_min) {
+                const struct uuid *row_uuid = ovsdb_row_get_uuid(txn_row->new);
+                if (zero && !txn_row->old) {
+                    return ovsdb_error(
+                        "constraint violation",
+                        "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
+                        " (inserted within this transaction) contained "
+                        "all-zeros UUID (probably as the default value for "
+                        "this column) but deleting this value caused a "
+                        "constraint volation because this column is not "
+                        "allowed to be empty.", column->name,
+                        table->schema->name, UUID_ARGS(row_uuid));
+                } else {
+                    return ovsdb_error(
+                        "constraint violation",
+                        "Deletion of %u weak reference(s) to deleted (or "
+                        "never-existing) rows from column \"%s\" in \"%s\" "
+                        "row "UUID_FMT" %scaused this column to become empty, "
+                        "but constraints on this column disallow an "
+                        "empty column.",
+                        orig_n - datum->n, column->name, table->schema->name,
+                        UUID_ARGS(row_uuid),
+                        (txn_row->old
+                         ? ""
+                         : "(inserted within this transaction) "));
+                }
+            }
+        }
+    }
+
+    return NULL;
+}
+
 static struct ovsdb_error * WARN_UNUSED_RESULT
 determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
 {
@@ -330,6 +466,14 @@ ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
         return error;
     }
 
+    /* Check reference counts and remove bad reference for "weak" referential
+     * integrity. */
+    error = for_each_txn_row(txn, assess_weak_refs);
+    if (error) {
+        ovsdb_txn_abort(txn);
+        return error;
+    }
+
     /* Send the commit to each replica. */
     LIST_FOR_EACH (replica, struct ovsdb_replica, node, &txn->db->replicas) {
         error = (replica->class->commit)(replica, txn, durable);