Replace most uses of assert by ovs_assert.
[sliver-openvswitch.git] / ovsdb / transaction.c
1 /* Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "transaction.h"
19
20 #include "bitmap.h"
21 #include "dynamic-string.h"
22 #include "hash.h"
23 #include "hmap.h"
24 #include "json.h"
25 #include "list.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb.h"
28 #include "row.h"
29 #include "table.h"
30 #include "uuid.h"
31
32 struct ovsdb_txn {
33     struct ovsdb *db;
34     struct list txn_tables;     /* Contains "struct ovsdb_txn_table"s. */
35     struct ds comment;
36 };
37
38 /* A table modified by a transaction. */
39 struct ovsdb_txn_table {
40     struct list node;           /* Element in ovsdb_txn's txn_tables list. */
41     struct ovsdb_table *table;
42     struct hmap txn_rows;       /* Contains "struct ovsdb_txn_row"s. */
43
44     /* This has the same form as the 'indexes' member of struct ovsdb_table,
45      * but it is only used or updated at transaction commit time, from
46      * check_index_uniqueness(). */
47     struct hmap *txn_indexes;
48
49     /* Used by for_each_txn_row(). */
50     unsigned int serial;        /* Serial number of in-progress iteration. */
51     unsigned int n_processed;   /* Number of rows processed. */
52 };
53
54 /* A row modified by the transaction:
55  *
56  *      - A row added by a transaction will have null 'old' and non-null 'new'.
57  *
58  *      - A row deleted by a transaction will have non-null 'old' and null
59  *        'new'.
60  *
61  *      - A row modified by a transaction will have non-null 'old' and 'new'.
62  *
63  *      - 'old' and 'new' both null indicates that a row was added then deleted
64  *        within a single transaction.  Most of the time we instead delete the
65  *        ovsdb_txn_row entirely, but inside a for_each_txn_row() callback
66  *        there are restrictions that sometimes mean we have to leave the
67  *        ovsdb_txn_row in place.
68  */
69 struct ovsdb_txn_row {
70     struct hmap_node hmap_node; /* In ovsdb_txn_table's txn_rows hmap. */
71     struct ovsdb_row *old;      /* The old row. */
72     struct ovsdb_row *new;      /* The new row. */
73     size_t n_refs;              /* Number of remaining references. */
74
75     /* These members are the same as the corresponding members of 'old' or
76      * 'new'.  They are present here for convenience and because occasionally
77      * there can be an ovsdb_txn_row where both 'old' and 'new' are NULL. */
78     struct uuid uuid;
79     struct ovsdb_table *table;
80
81     /* Used by for_each_txn_row(). */
82     unsigned int serial;        /* Serial number of in-progress commit. */
83
84     unsigned long changed[];    /* Bits set to 1 for columns that changed. */
85 };
86
87 static struct ovsdb_error * WARN_UNUSED_RESULT
88 delete_garbage_row(struct ovsdb_txn *txn, struct ovsdb_txn_row *r);
89 static void ovsdb_txn_row_prefree(struct ovsdb_txn_row *);
90 static struct ovsdb_error * WARN_UNUSED_RESULT
91 for_each_txn_row(struct ovsdb_txn *txn,
92                       struct ovsdb_error *(*)(struct ovsdb_txn *,
93                                               struct ovsdb_txn_row *));
94
95 /* Used by for_each_txn_row() to track tables and rows that have been
96  * processed.  */
97 static unsigned int serial;
98
99 struct ovsdb_txn *
100 ovsdb_txn_create(struct ovsdb *db)
101 {
102     struct ovsdb_txn *txn = xmalloc(sizeof *txn);
103     txn->db = db;
104     list_init(&txn->txn_tables);
105     ds_init(&txn->comment);
106     return txn;
107 }
108
109 static void
110 ovsdb_txn_free(struct ovsdb_txn *txn)
111 {
112     ovs_assert(list_is_empty(&txn->txn_tables));
113     ds_destroy(&txn->comment);
114     free(txn);
115 }
116
117 static struct ovsdb_error *
118 ovsdb_txn_row_abort(struct ovsdb_txn *txn OVS_UNUSED,
119                     struct ovsdb_txn_row *txn_row)
120 {
121     struct ovsdb_row *old = txn_row->old;
122     struct ovsdb_row *new = txn_row->new;
123
124     ovsdb_txn_row_prefree(txn_row);
125     if (!old) {
126         if (new) {
127             hmap_remove(&new->table->rows, &new->hmap_node);
128         }
129     } else if (!new) {
130         hmap_insert(&old->table->rows, &old->hmap_node, ovsdb_row_hash(old));
131     } else {
132         hmap_replace(&new->table->rows, &new->hmap_node, &old->hmap_node);
133     }
134     ovsdb_row_destroy(new);
135     free(txn_row);
136
137     return NULL;
138 }
139
140 /* Returns the offset in bytes from the start of an ovsdb_row for 'table' to
141  * the hmap_node for the index numbered 'i'. */
142 static size_t
143 ovsdb_row_index_offset__(const struct ovsdb_table *table, size_t i)
144 {
145     size_t n_fields = shash_count(&table->schema->columns);
146     return (offsetof(struct ovsdb_row, fields)
147             + n_fields * sizeof(struct ovsdb_datum)
148             + i * sizeof(struct hmap_node));
149 }
150
151 /* Returns the hmap_node in 'row' for the index numbered 'i'. */
152 static struct hmap_node *
153 ovsdb_row_get_index_node(struct ovsdb_row *row, size_t i)
154 {
155     return (void *) ((char *) row + ovsdb_row_index_offset__(row->table, i));
156 }
157
158 /* Returns the ovsdb_row given 'index_node', which is a pointer to that row's
159  * hmap_node for the index numbered 'i' within 'table'. */
160 static struct ovsdb_row *
161 ovsdb_row_from_index_node(struct hmap_node *index_node,
162                           const struct ovsdb_table *table, size_t i)
163 {
164     return (void *) ((char *) index_node - ovsdb_row_index_offset__(table, i));
165 }
166
167 void
168 ovsdb_txn_abort(struct ovsdb_txn *txn)
169 {
170     ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_abort));
171     ovsdb_txn_free(txn);
172 }
173
174 static struct ovsdb_txn_row *
175 find_txn_row(const struct ovsdb_table *table, const struct uuid *uuid)
176 {
177     struct ovsdb_txn_row *txn_row;
178
179     if (!table->txn_table) {
180         return NULL;
181     }
182
183     HMAP_FOR_EACH_WITH_HASH (txn_row, hmap_node,
184                              uuid_hash(uuid), &table->txn_table->txn_rows) {
185         if (uuid_equals(uuid, &txn_row->uuid)) {
186             return txn_row;
187         }
188     }
189
190     return NULL;
191 }
192
193 static struct ovsdb_txn_row *
194 find_or_make_txn_row(struct ovsdb_txn *txn, const struct ovsdb_table *table,
195                      const struct uuid *uuid)
196 {
197     struct ovsdb_txn_row *txn_row = find_txn_row(table, uuid);
198     if (!txn_row) {
199         const struct ovsdb_row *row = ovsdb_table_get_row(table, uuid);
200         if (row) {
201             txn_row = ovsdb_txn_row_modify(txn, row)->txn_row;
202         }
203     }
204     return txn_row;
205 }
206
207 static struct ovsdb_error * WARN_UNUSED_RESULT
208 ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
209                            const struct ovsdb_column *c,
210                            const struct ovsdb_base_type *base,
211                            const union ovsdb_atom *atoms, unsigned int n,
212                            int delta)
213 {
214     const struct ovsdb_table *table;
215     unsigned int i;
216
217     if (!ovsdb_base_type_is_strong_ref(base)) {
218         return NULL;
219     }
220
221     table = base->u.uuid.refTable;
222     for (i = 0; i < n; i++) {
223         const struct uuid *uuid = &atoms[i].uuid;
224         struct ovsdb_txn_row *txn_row;
225
226         if (uuid_equals(uuid, ovsdb_row_get_uuid(r))) {
227             /* Self-references don't count. */
228             continue;
229         }
230
231         txn_row = find_or_make_txn_row(txn, table, uuid);
232         if (!txn_row) {
233             return ovsdb_error("referential integrity violation",
234                                "Table %s column %s row "UUID_FMT" "
235                                "references nonexistent row "UUID_FMT" in "
236                                "table %s.",
237                                r->table->schema->name, c->name,
238                                UUID_ARGS(ovsdb_row_get_uuid(r)),
239                                UUID_ARGS(uuid), table->schema->name);
240         }
241         txn_row->n_refs += delta;
242     }
243
244     return NULL;
245 }
246
247 static struct ovsdb_error * WARN_UNUSED_RESULT
248 ovsdb_txn_adjust_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
249                           const struct ovsdb_column *column, int delta)
250 {
251     const struct ovsdb_datum *field = &r->fields[column->index];
252     struct ovsdb_error *error;
253
254     error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.key,
255                                        field->keys, field->n, delta);
256     if (!error) {
257         error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.value,
258                                            field->values, field->n, delta);
259     }
260     return error;
261 }
262
263 static struct ovsdb_error * WARN_UNUSED_RESULT
264 update_row_ref_count(struct ovsdb_txn *txn, struct ovsdb_txn_row *r)
265 {
266     struct ovsdb_table *table = r->table;
267     struct shash_node *node;
268
269     SHASH_FOR_EACH (node, &table->schema->columns) {
270         const struct ovsdb_column *column = node->data;
271         struct ovsdb_error *error;
272
273         if (r->old) {
274             error = ovsdb_txn_adjust_row_refs(txn, r->old, column, -1);
275             if (error) {
276                 return OVSDB_WRAP_BUG("error decreasing refcount", error);
277             }
278         }
279         if (r->new) {
280             error = ovsdb_txn_adjust_row_refs(txn, r->new, column, 1);
281             if (error) {
282                 return error;
283             }
284         }
285     }
286
287     return NULL;
288 }
289
290 static struct ovsdb_error * WARN_UNUSED_RESULT
291 check_ref_count(struct ovsdb_txn *txn OVS_UNUSED, struct ovsdb_txn_row *r)
292 {
293     if (r->new || !r->n_refs) {
294         return NULL;
295     } else {
296         return ovsdb_error("referential integrity violation",
297                            "cannot delete %s row "UUID_FMT" because "
298                            "of %zu remaining reference(s)",
299                            r->table->schema->name, UUID_ARGS(&r->uuid),
300                            r->n_refs);
301     }
302 }
303
304 static struct ovsdb_error * WARN_UNUSED_RESULT
305 delete_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *row,
306                 const struct ovsdb_base_type *base,
307                 const union ovsdb_atom *atoms, unsigned int n)
308 {
309     const struct ovsdb_table *table;
310     unsigned int i;
311
312     if (!ovsdb_base_type_is_strong_ref(base)) {
313         return NULL;
314     }
315
316     table = base->u.uuid.refTable;
317     for (i = 0; i < n; i++) {
318         const struct uuid *uuid = &atoms[i].uuid;
319         struct ovsdb_txn_row *txn_row;
320
321         if (uuid_equals(uuid, ovsdb_row_get_uuid(row))) {
322             /* Self-references don't count. */
323             continue;
324         }
325
326         txn_row = find_or_make_txn_row(txn, table, uuid);
327         if (!txn_row) {
328             return OVSDB_BUG("strong ref target missing");
329         } else if (!txn_row->n_refs) {
330             return OVSDB_BUG("strong ref target has zero n_refs");
331         } else if (!txn_row->new) {
332             return OVSDB_BUG("deleted strong ref target");
333         }
334
335         if (--txn_row->n_refs == 0) {
336             struct ovsdb_error *error = delete_garbage_row(txn, txn_row);
337             if (error) {
338                 return error;
339             }
340         }
341     }
342
343     return NULL;
344 }
345
346 static struct ovsdb_error * WARN_UNUSED_RESULT
347 delete_garbage_row(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
348 {
349     struct shash_node *node;
350     struct ovsdb_row *row;
351
352     if (txn_row->table->schema->is_root) {
353         return NULL;
354     }
355
356     row = txn_row->new;
357     txn_row->new = NULL;
358     hmap_remove(&txn_row->table->rows, &row->hmap_node);
359     SHASH_FOR_EACH (node, &txn_row->table->schema->columns) {
360         const struct ovsdb_column *column = node->data;
361         const struct ovsdb_datum *field = &row->fields[column->index];
362         struct ovsdb_error *error;
363
364         error = delete_row_refs(txn, row,
365                                 &column->type.key, field->keys, field->n);
366         if (error) {
367             return error;
368         }
369
370         error = delete_row_refs(txn, row,
371                                 &column->type.value, field->values, field->n);
372         if (error) {
373             return error;
374         }
375     }
376     ovsdb_row_destroy(row);
377
378     return NULL;
379 }
380
381 static struct ovsdb_error * WARN_UNUSED_RESULT
382 collect_garbage(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
383 {
384     if (txn_row->new && !txn_row->n_refs) {
385         return delete_garbage_row(txn, txn_row);
386     }
387     return NULL;
388 }
389
390 static struct ovsdb_error * WARN_UNUSED_RESULT
391 update_ref_counts(struct ovsdb_txn *txn)
392 {
393     struct ovsdb_error *error;
394
395     error = for_each_txn_row(txn, update_row_ref_count);
396     if (error) {
397         return error;
398     }
399
400     return for_each_txn_row(txn, check_ref_count);
401 }
402
403 static struct ovsdb_error *
404 ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
405                      struct ovsdb_txn_row *txn_row)
406 {
407     size_t n_indexes = txn_row->table->schema->n_indexes;
408
409     if (txn_row->old) {
410         size_t i;
411
412         for (i = 0; i < n_indexes; i++) {
413             struct hmap_node *node = ovsdb_row_get_index_node(txn_row->old, i);
414             hmap_remove(&txn_row->table->indexes[i], node);
415         }
416     }
417     if (txn_row->new) {
418         size_t i;
419
420         for (i = 0; i < n_indexes; i++) {
421             struct hmap_node *node = ovsdb_row_get_index_node(txn_row->new, i);
422             hmap_insert(&txn_row->table->indexes[i], node, node->hash);
423         }
424     }
425
426     ovsdb_txn_row_prefree(txn_row);
427     if (txn_row->new) {
428         txn_row->new->n_refs = txn_row->n_refs;
429     }
430     ovsdb_row_destroy(txn_row->old);
431     free(txn_row);
432
433     return NULL;
434 }
435
436 static void
437 add_weak_ref(struct ovsdb_txn *txn,
438              const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
439 {
440     struct ovsdb_row *src = CONST_CAST(struct ovsdb_row *, src_);
441     struct ovsdb_row *dst = CONST_CAST(struct ovsdb_row *, dst_);
442     struct ovsdb_weak_ref *weak;
443
444     if (src == dst) {
445         return;
446     }
447
448     dst = ovsdb_txn_row_modify(txn, dst);
449
450     if (!list_is_empty(&dst->dst_refs)) {
451         /* Omit duplicates. */
452         weak = CONTAINER_OF(list_back(&dst->dst_refs),
453                             struct ovsdb_weak_ref, dst_node);
454         if (weak->src == src) {
455             return;
456         }
457     }
458
459     weak = xmalloc(sizeof *weak);
460     weak->src = src;
461     list_push_back(&dst->dst_refs, &weak->dst_node);
462     list_push_back(&src->src_refs, &weak->src_node);
463 }
464
465 static struct ovsdb_error * WARN_UNUSED_RESULT
466 assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
467 {
468     struct ovsdb_table *table;
469     struct shash_node *node;
470
471     if (txn_row->old) {
472         /* Mark rows that have weak references to 'txn_row' as modified, so
473          * that their weak references will get reassessed. */
474         struct ovsdb_weak_ref *weak, *next;
475
476         LIST_FOR_EACH_SAFE (weak, next, dst_node, &txn_row->old->dst_refs) {
477             if (!weak->src->txn_row) {
478                 ovsdb_txn_row_modify(txn, weak->src);
479             }
480         }
481     }
482
483     if (!txn_row->new) {
484         /* We don't have to do anything about references that originate at
485          * 'txn_row', because ovsdb_row_destroy() will remove those weak
486          * references. */
487         return NULL;
488     }
489
490     table = txn_row->table;
491     SHASH_FOR_EACH (node, &table->schema->columns) {
492         const struct ovsdb_column *column = node->data;
493         struct ovsdb_datum *datum = &txn_row->new->fields[column->index];
494         unsigned int orig_n, i;
495         bool zero = false;
496
497         orig_n = datum->n;
498
499         if (ovsdb_base_type_is_weak_ref(&column->type.key)) {
500             for (i = 0; i < datum->n; ) {
501                 const struct ovsdb_row *row;
502
503                 row = ovsdb_table_get_row(column->type.key.u.uuid.refTable,
504                                           &datum->keys[i].uuid);
505                 if (row) {
506                     add_weak_ref(txn, txn_row->new, row);
507                     i++;
508                 } else {
509                     if (uuid_is_zero(&datum->keys[i].uuid)) {
510                         zero = true;
511                     }
512                     ovsdb_datum_remove_unsafe(datum, i, &column->type);
513                 }
514             }
515         }
516
517         if (ovsdb_base_type_is_weak_ref(&column->type.value)) {
518             for (i = 0; i < datum->n; ) {
519                 const struct ovsdb_row *row;
520
521                 row = ovsdb_table_get_row(column->type.value.u.uuid.refTable,
522                                           &datum->values[i].uuid);
523                 if (row) {
524                     add_weak_ref(txn, txn_row->new, row);
525                     i++;
526                 } else {
527                     if (uuid_is_zero(&datum->values[i].uuid)) {
528                         zero = true;
529                     }
530                     ovsdb_datum_remove_unsafe(datum, i, &column->type);
531                 }
532             }
533         }
534
535         if (datum->n != orig_n) {
536             bitmap_set1(txn_row->changed, column->index);
537             ovsdb_datum_sort_assert(datum, column->type.key.type);
538             if (datum->n < column->type.n_min) {
539                 const struct uuid *row_uuid = ovsdb_row_get_uuid(txn_row->new);
540                 if (zero && !txn_row->old) {
541                     return ovsdb_error(
542                         "constraint violation",
543                         "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
544                         " (inserted within this transaction) contained "
545                         "all-zeros UUID (probably as the default value for "
546                         "this column) but deleting this value caused a "
547                         "constraint volation because this column is not "
548                         "allowed to be empty.", column->name,
549                         table->schema->name, UUID_ARGS(row_uuid));
550                 } else {
551                     return ovsdb_error(
552                         "constraint violation",
553                         "Deletion of %u weak reference(s) to deleted (or "
554                         "never-existing) rows from column \"%s\" in \"%s\" "
555                         "row "UUID_FMT" %scaused this column to become empty, "
556                         "but constraints on this column disallow an "
557                         "empty column.",
558                         orig_n - datum->n, column->name, table->schema->name,
559                         UUID_ARGS(row_uuid),
560                         (txn_row->old
561                          ? ""
562                          : "(inserted within this transaction) "));
563                 }
564             }
565         }
566     }
567
568     return NULL;
569 }
570
571 static struct ovsdb_error * WARN_UNUSED_RESULT
572 determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
573 {
574     struct ovsdb_table *table = txn_row->table;
575
576     if (txn_row->old && txn_row->new) {
577         struct shash_node *node;
578         bool changed = false;
579
580         SHASH_FOR_EACH (node, &table->schema->columns) {
581             const struct ovsdb_column *column = node->data;
582             const struct ovsdb_type *type = &column->type;
583             unsigned int idx = column->index;
584
585             if (!ovsdb_datum_equals(&txn_row->old->fields[idx],
586                                     &txn_row->new->fields[idx],
587                                     type)) {
588                 bitmap_set1(txn_row->changed, idx);
589                 changed = true;
590             }
591         }
592
593         if (!changed) {
594             /* Nothing actually changed in this row, so drop it. */
595             ovsdb_txn_row_abort(txn, txn_row);
596         }
597     } else {
598         bitmap_set_multiple(txn_row->changed, 0,
599                             shash_count(&table->schema->columns), 1);
600     }
601
602     return NULL;
603 }
604
605 static struct ovsdb_error * WARN_UNUSED_RESULT
606 check_max_rows(struct ovsdb_txn *txn)
607 {
608     struct ovsdb_txn_table *t;
609
610     LIST_FOR_EACH (t, node, &txn->txn_tables) {
611         size_t n_rows = hmap_count(&t->table->rows);
612         unsigned int max_rows = t->table->schema->max_rows;
613
614         if (n_rows > max_rows) {
615             return ovsdb_error("constraint violation",
616                                "transaction causes \"%s\" table to contain "
617                                "%zu rows, greater than the schema-defined "
618                                "limit of %u row(s)",
619                                t->table->schema->name, n_rows, max_rows);
620         }
621     }
622
623     return NULL;
624 }
625
626 static struct ovsdb_row *
627 ovsdb_index_search(struct hmap *index, struct ovsdb_row *row, size_t i,
628                    uint32_t hash)
629 {
630     const struct ovsdb_table *table = row->table;
631     const struct ovsdb_column_set *columns = &table->schema->indexes[i];
632     struct hmap_node *node;
633
634     for (node = hmap_first_with_hash(index, hash); node;
635          node = hmap_next_with_hash(node)) {
636         struct ovsdb_row *irow = ovsdb_row_from_index_node(node, table, i);
637         if (ovsdb_row_equal_columns(row, irow, columns)) {
638             return irow;
639         }
640     }
641
642     return NULL;
643 }
644
645 static void
646 duplicate_index_row__(const struct ovsdb_column_set *index,
647                       const struct ovsdb_row *row,
648                       const char *title,
649                       struct ds *out)
650 {
651     size_t n_columns = shash_count(&row->table->schema->columns);
652
653     ds_put_format(out, "%s row, with UUID "UUID_FMT", ",
654                   title, UUID_ARGS(ovsdb_row_get_uuid(row)));
655     if (!row->txn_row
656         || bitmap_scan(row->txn_row->changed, 0, n_columns) == n_columns) {
657         ds_put_cstr(out, "existed in the database before this "
658                     "transaction and was not modified by the transaction.");
659     } else if (!row->txn_row->old) {
660         ds_put_cstr(out, "was inserted by this transaction.");
661     } else if (ovsdb_row_equal_columns(row->txn_row->old,
662                                        row->txn_row->new, index)) {
663         ds_put_cstr(out, "existed in the database before this "
664                     "transaction, which modified some of the row's columns "
665                     "but not any columns in this index.");
666     } else {
667         ds_put_cstr(out, "had the following index values before the "
668                     "transaction: ");
669         ovsdb_row_columns_to_string(row->txn_row->old, index, out);
670         ds_put_char(out, '.');
671     }
672 }
673
674 static struct ovsdb_error * WARN_UNUSED_RESULT
675 duplicate_index_row(const struct ovsdb_column_set *index,
676                     const struct ovsdb_row *a,
677                     const struct ovsdb_row *b)
678 {
679     struct ovsdb_column_set all_columns;
680     struct ovsdb_error *error;
681     char *index_s;
682     struct ds s;
683
684     /* Put 'a' and 'b' in a predictable order to make error messages
685      * reproducible for testing. */
686     ovsdb_column_set_init(&all_columns);
687     ovsdb_column_set_add_all(&all_columns, a->table);
688     if (ovsdb_row_compare_columns_3way(a, b, &all_columns) < 0) {
689         const struct ovsdb_row *tmp = a;
690         a = b;
691         b = tmp;
692     }
693     ovsdb_column_set_destroy(&all_columns);
694
695     index_s = ovsdb_column_set_to_string(index);
696
697     ds_init(&s);
698     ds_put_format(&s, "Transaction causes multiple rows in \"%s\" table to "
699                   "have identical values (", a->table->schema->name);
700     ovsdb_row_columns_to_string(a, index, &s);
701     ds_put_format(&s, ") for index on %s.  ", index_s);
702     duplicate_index_row__(index, a, "First", &s);
703     ds_put_cstr(&s, "  ");
704     duplicate_index_row__(index, b, "Second", &s);
705
706     free(index_s);
707
708     error = ovsdb_error("constraint violation", "%s", ds_cstr(&s));
709     ds_destroy(&s);
710     return error;
711 }
712
713 static struct ovsdb_error * WARN_UNUSED_RESULT
714 check_index_uniqueness(struct ovsdb_txn *txn OVS_UNUSED,
715                        struct ovsdb_txn_row *txn_row)
716 {
717     struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
718     struct ovsdb_table *table = txn_row->table;
719     struct ovsdb_row *row = txn_row->new;
720     size_t i;
721
722     if (!row) {
723         return NULL;
724     }
725
726     for (i = 0; i < table->schema->n_indexes; i++) {
727         const struct ovsdb_column_set *index = &table->schema->indexes[i];
728         struct ovsdb_row *irow;
729         uint32_t hash;
730
731         hash = ovsdb_row_hash_columns(row, index, 0);
732         irow = ovsdb_index_search(&txn_table->txn_indexes[i], row, i, hash);
733         if (irow) {
734             return duplicate_index_row(index, irow, row);
735         }
736
737         irow = ovsdb_index_search(&table->indexes[i], row, i, hash);
738         if (irow && !irow->txn_row) {
739             return duplicate_index_row(index, irow, row);
740         }
741
742         hmap_insert(&txn_table->txn_indexes[i],
743                     ovsdb_row_get_index_node(row, i), hash);
744     }
745
746     return NULL;
747 }
748
749 struct ovsdb_error *
750 ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
751 {
752     struct ovsdb_replica *replica;
753     struct ovsdb_error *error;
754
755     /* Figure out what actually changed, and abort early if the transaction
756      * was really a no-op. */
757     error = for_each_txn_row(txn, determine_changes);
758     if (error) {
759         return OVSDB_WRAP_BUG("can't happen", error);
760     }
761     if (list_is_empty(&txn->txn_tables)) {
762         ovsdb_txn_abort(txn);
763         return NULL;
764     }
765
766     /* Update reference counts and check referential integrity. */
767     error = update_ref_counts(txn);
768     if (error) {
769         ovsdb_txn_abort(txn);
770         return error;
771     }
772
773     /* Delete unreferenced, non-root rows. */
774     error = for_each_txn_row(txn, collect_garbage);
775     if (error) {
776         ovsdb_txn_abort(txn);
777         return OVSDB_WRAP_BUG("can't happen", error);
778     }
779
780     /* Check maximum rows table constraints. */
781     error = check_max_rows(txn);
782     if (error) {
783         ovsdb_txn_abort(txn);
784         return error;
785     }
786
787     /* Check reference counts and remove bad references for "weak" referential
788      * integrity. */
789     error = for_each_txn_row(txn, assess_weak_refs);
790     if (error) {
791         ovsdb_txn_abort(txn);
792         return error;
793     }
794
795     /* Verify that the indexes will still be unique post-transaction. */
796     error = for_each_txn_row(txn, check_index_uniqueness);
797     if (error) {
798         ovsdb_txn_abort(txn);
799         return error;
800     }
801
802     /* Send the commit to each replica. */
803     LIST_FOR_EACH (replica, node, &txn->db->replicas) {
804         error = (replica->class->commit)(replica, txn, durable);
805         if (error) {
806             /* We don't support two-phase commit so only the first replica is
807              * allowed to report an error. */
808             ovs_assert(&replica->node == txn->db->replicas.next);
809
810             ovsdb_txn_abort(txn);
811             return error;
812         }
813     }
814
815     /* Finalize commit. */
816     txn->db->run_triggers = true;
817     ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_commit));
818     ovsdb_txn_free(txn);
819
820     return NULL;
821 }
822
823 void
824 ovsdb_txn_for_each_change(const struct ovsdb_txn *txn,
825                           ovsdb_txn_row_cb_func *cb, void *aux)
826 {
827     struct ovsdb_txn_table *t;
828     struct ovsdb_txn_row *r;
829
830     LIST_FOR_EACH (t, node, &txn->txn_tables) {
831         HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
832             if ((r->old || r->new) && !cb(r->old, r->new, r->changed, aux)) {
833                 break;
834             }
835         }
836    }
837 }
838
839 static struct ovsdb_txn_table *
840 ovsdb_txn_create_txn_table(struct ovsdb_txn *txn, struct ovsdb_table *table)
841 {
842     if (!table->txn_table) {
843         struct ovsdb_txn_table *txn_table;
844         size_t i;
845
846         table->txn_table = txn_table = xmalloc(sizeof *table->txn_table);
847         txn_table->table = table;
848         hmap_init(&txn_table->txn_rows);
849         txn_table->serial = serial - 1;
850         txn_table->txn_indexes = xmalloc(table->schema->n_indexes
851                                          * sizeof *txn_table->txn_indexes);
852         for (i = 0; i < table->schema->n_indexes; i++) {
853             hmap_init(&txn_table->txn_indexes[i]);
854         }
855         list_push_back(&txn->txn_tables, &txn_table->node);
856     }
857     return table->txn_table;
858 }
859
860 static struct ovsdb_txn_row *
861 ovsdb_txn_row_create(struct ovsdb_txn *txn, struct ovsdb_table *table,
862                      const struct ovsdb_row *old_, struct ovsdb_row *new)
863 {
864     const struct ovsdb_row *row = old_ ? old_ : new;
865     struct ovsdb_row *old = CONST_CAST(struct ovsdb_row *, old_);
866     size_t n_columns = shash_count(&table->schema->columns);
867     struct ovsdb_txn_table *txn_table;
868     struct ovsdb_txn_row *txn_row;
869
870     txn_row = xzalloc(offsetof(struct ovsdb_txn_row, changed)
871                       + bitmap_n_bytes(n_columns));
872     txn_row->uuid = *ovsdb_row_get_uuid(row);
873     txn_row->table = row->table;
874     txn_row->old = old;
875     txn_row->new = new;
876     txn_row->n_refs = old ? old->n_refs : 0;
877     txn_row->serial = serial - 1;
878
879     if (old) {
880         old->txn_row = txn_row;
881     }
882     if (new) {
883         new->txn_row = txn_row;
884     }
885
886     txn_table = ovsdb_txn_create_txn_table(txn, table);
887     hmap_insert(&txn_table->txn_rows, &txn_row->hmap_node,
888                 ovsdb_row_hash(old ? old : new));
889
890     return txn_row;
891 }
892
893 struct ovsdb_row *
894 ovsdb_txn_row_modify(struct ovsdb_txn *txn, const struct ovsdb_row *ro_row_)
895 {
896     struct ovsdb_row *ro_row = CONST_CAST(struct ovsdb_row *, ro_row_);
897
898     if (ro_row->txn_row) {
899         ovs_assert(ro_row == ro_row->txn_row->new);
900         return ro_row;
901     } else {
902         struct ovsdb_table *table = ro_row->table;
903         struct ovsdb_row *rw_row;
904
905         rw_row = ovsdb_row_clone(ro_row);
906         rw_row->n_refs = ro_row->n_refs;
907         uuid_generate(ovsdb_row_get_version_rw(rw_row));
908         ovsdb_txn_row_create(txn, table, ro_row, rw_row);
909         hmap_replace(&table->rows, &ro_row->hmap_node, &rw_row->hmap_node);
910
911         return rw_row;
912     }
913 }
914
915 void
916 ovsdb_txn_row_insert(struct ovsdb_txn *txn, struct ovsdb_row *row)
917 {
918     uint32_t hash = ovsdb_row_hash(row);
919     struct ovsdb_table *table = row->table;
920
921     uuid_generate(ovsdb_row_get_version_rw(row));
922
923     ovsdb_txn_row_create(txn, table, NULL, row);
924     hmap_insert(&table->rows, &row->hmap_node, hash);
925 }
926
927 /* 'row' must be assumed destroyed upon return; the caller must not reference
928  * it again. */
929 void
930 ovsdb_txn_row_delete(struct ovsdb_txn *txn, const struct ovsdb_row *row_)
931 {
932     struct ovsdb_row *row = CONST_CAST(struct ovsdb_row *, row_);
933     struct ovsdb_table *table = row->table;
934     struct ovsdb_txn_row *txn_row = row->txn_row;
935
936     hmap_remove(&table->rows, &row->hmap_node);
937
938     if (!txn_row) {
939         ovsdb_txn_row_create(txn, table, row, NULL);
940     } else {
941         ovs_assert(txn_row->new == row);
942         if (txn_row->old) {
943             txn_row->new = NULL;
944         } else {
945             hmap_remove(&table->txn_table->txn_rows, &txn_row->hmap_node);
946             free(txn_row);
947         }
948         ovsdb_row_destroy(row);
949     }
950 }
951
952 void
953 ovsdb_txn_add_comment(struct ovsdb_txn *txn, const char *s)
954 {
955     if (txn->comment.length) {
956         ds_put_char(&txn->comment, '\n');
957     }
958     ds_put_cstr(&txn->comment, s);
959 }
960
961 const char *
962 ovsdb_txn_get_comment(const struct ovsdb_txn *txn)
963 {
964     return txn->comment.length ? ds_cstr_ro(&txn->comment) : NULL;
965 }
966 \f
967 static void
968 ovsdb_txn_row_prefree(struct ovsdb_txn_row *txn_row)
969 {
970     struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
971
972     txn_table->n_processed--;
973     hmap_remove(&txn_table->txn_rows, &txn_row->hmap_node);
974
975     if (txn_row->old) {
976         txn_row->old->txn_row = NULL;
977     }
978     if (txn_row->new) {
979         txn_row->new->txn_row = NULL;
980     }
981 }
982
983 static void
984 ovsdb_txn_table_destroy(struct ovsdb_txn_table *txn_table)
985 {
986     size_t i;
987
988     ovs_assert(hmap_is_empty(&txn_table->txn_rows));
989
990     for (i = 0; i < txn_table->table->schema->n_indexes; i++) {
991         hmap_destroy(&txn_table->txn_indexes[i]);
992     }
993     free(txn_table->txn_indexes);
994
995     txn_table->table->txn_table = NULL;
996     hmap_destroy(&txn_table->txn_rows);
997     list_remove(&txn_table->node);
998     free(txn_table);
999 }
1000
1001 /* Calls 'cb' for every txn_row within 'txn'.  If 'cb' returns nonnull, this
1002  * aborts the iteration and for_each_txn_row() passes the error up.  Otherwise,
1003  * returns a null pointer after iteration is complete.
1004  *
1005  * 'cb' may insert new txn_rows and new txn_tables into 'txn'.  It may delete
1006  * the txn_row that it is passed in, or txn_rows in txn_tables other than the
1007  * one passed to 'cb'.  It may *not* delete txn_rows other than the one passed
1008  * in within the same txn_table.  It may *not* delete any txn_tables.  As long
1009  * as these rules are followed, 'cb' will be called exactly once for each
1010  * txn_row in 'txn', even those added by 'cb'.
1011  *
1012  * (Even though 'cb' is not allowed to delete some txn_rows, it can still
1013  * delete any actual row by clearing a txn_row's 'new' member.)
1014  */
1015 static struct ovsdb_error * WARN_UNUSED_RESULT
1016 for_each_txn_row(struct ovsdb_txn *txn,
1017                  struct ovsdb_error *(*cb)(struct ovsdb_txn *,
1018                                            struct ovsdb_txn_row *))
1019 {
1020     bool any_work;
1021
1022     serial++;
1023
1024     do {
1025         struct ovsdb_txn_table *t, *next_txn_table;
1026
1027         any_work = false;
1028         LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
1029             if (t->serial != serial) {
1030                 t->serial = serial;
1031                 t->n_processed = 0;
1032             }
1033
1034             while (t->n_processed < hmap_count(&t->txn_rows)) {
1035                 struct ovsdb_txn_row *r, *next_txn_row;
1036
1037                 HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
1038                     if (r->serial != serial) {
1039                         struct ovsdb_error *error;
1040
1041                         r->serial = serial;
1042                         t->n_processed++;
1043                         any_work = true;
1044
1045                         error = cb(txn, r);
1046                         if (error) {
1047                             return error;
1048                         }
1049                     }
1050                 }
1051             }
1052             if (hmap_is_empty(&t->txn_rows)) {
1053                 /* Table is empty.  Drop it. */
1054                 ovsdb_txn_table_destroy(t);
1055             }
1056         }
1057     } while (any_work);
1058
1059     return NULL;
1060 }