Setting tag sliver-openvswitch-2.2.90-1
[sliver-openvswitch.git] / ovsdb / transaction.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "transaction.h"
19
20 #include "bitmap.h"
21 #include "dynamic-string.h"
22 #include "hash.h"
23 #include "hmap.h"
24 #include "json.h"
25 #include "list.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb.h"
28 #include "row.h"
29 #include "table.h"
30 #include "uuid.h"
31
32 struct ovsdb_txn {
33     struct ovsdb *db;
34     struct list txn_tables;     /* Contains "struct ovsdb_txn_table"s. */
35     struct ds comment;
36 };
37
38 /* A table modified by a transaction. */
39 struct ovsdb_txn_table {
40     struct list node;           /* Element in ovsdb_txn's txn_tables list. */
41     struct ovsdb_table *table;
42     struct hmap txn_rows;       /* Contains "struct ovsdb_txn_row"s. */
43
44     /* This has the same form as the 'indexes' member of struct ovsdb_table,
45      * but it is only used or updated at transaction commit time, from
46      * check_index_uniqueness(). */
47     struct hmap *txn_indexes;
48
49     /* Used by for_each_txn_row(). */
50     unsigned int serial;        /* Serial number of in-progress iteration. */
51     unsigned int n_processed;   /* Number of rows processed. */
52 };
53
54 /* A row modified by the transaction:
55  *
56  *      - A row added by a transaction will have null 'old' and non-null 'new'.
57  *
58  *      - A row deleted by a transaction will have non-null 'old' and null
59  *        'new'.
60  *
61  *      - A row modified by a transaction will have non-null 'old' and 'new'.
62  *
63  *      - 'old' and 'new' both null indicates that a row was added then deleted
64  *        within a single transaction.  Most of the time we instead delete the
65  *        ovsdb_txn_row entirely, but inside a for_each_txn_row() callback
66  *        there are restrictions that sometimes mean we have to leave the
67  *        ovsdb_txn_row in place.
68  */
69 struct ovsdb_txn_row {
70     struct hmap_node hmap_node; /* In ovsdb_txn_table's txn_rows hmap. */
71     struct ovsdb_row *old;      /* The old row. */
72     struct ovsdb_row *new;      /* The new row. */
73     size_t n_refs;              /* Number of remaining references. */
74
75     /* These members are the same as the corresponding members of 'old' or
76      * 'new'.  They are present here for convenience and because occasionally
77      * there can be an ovsdb_txn_row where both 'old' and 'new' are NULL. */
78     struct uuid uuid;
79     struct ovsdb_table *table;
80
81     /* Used by for_each_txn_row(). */
82     unsigned int serial;        /* Serial number of in-progress commit. */
83
84     unsigned long changed[];    /* Bits set to 1 for columns that changed. */
85 };
86
87 static struct ovsdb_error * WARN_UNUSED_RESULT
88 delete_garbage_row(struct ovsdb_txn *txn, struct ovsdb_txn_row *r);
89 static void ovsdb_txn_row_prefree(struct ovsdb_txn_row *);
90 static struct ovsdb_error * WARN_UNUSED_RESULT
91 for_each_txn_row(struct ovsdb_txn *txn,
92                       struct ovsdb_error *(*)(struct ovsdb_txn *,
93                                               struct ovsdb_txn_row *));
94
95 /* Used by for_each_txn_row() to track tables and rows that have been
96  * processed.  */
97 static unsigned int serial;
98
99 struct ovsdb_txn *
100 ovsdb_txn_create(struct ovsdb *db)
101 {
102     struct ovsdb_txn *txn = xmalloc(sizeof *txn);
103     txn->db = db;
104     list_init(&txn->txn_tables);
105     ds_init(&txn->comment);
106     return txn;
107 }
108
109 static void
110 ovsdb_txn_free(struct ovsdb_txn *txn)
111 {
112     ovs_assert(list_is_empty(&txn->txn_tables));
113     ds_destroy(&txn->comment);
114     free(txn);
115 }
116
117 static struct ovsdb_error *
118 ovsdb_txn_row_abort(struct ovsdb_txn *txn OVS_UNUSED,
119                     struct ovsdb_txn_row *txn_row)
120 {
121     struct ovsdb_row *old = txn_row->old;
122     struct ovsdb_row *new = txn_row->new;
123
124     ovsdb_txn_row_prefree(txn_row);
125     if (!old) {
126         if (new) {
127             hmap_remove(&new->table->rows, &new->hmap_node);
128         }
129     } else if (!new) {
130         hmap_insert(&old->table->rows, &old->hmap_node, ovsdb_row_hash(old));
131     } else {
132         hmap_replace(&new->table->rows, &new->hmap_node, &old->hmap_node);
133     }
134     ovsdb_row_destroy(new);
135     free(txn_row);
136
137     return NULL;
138 }
139
140 /* Returns the offset in bytes from the start of an ovsdb_row for 'table' to
141  * the hmap_node for the index numbered 'i'. */
142 static size_t
143 ovsdb_row_index_offset__(const struct ovsdb_table *table, size_t i)
144 {
145     size_t n_fields = shash_count(&table->schema->columns);
146     return (offsetof(struct ovsdb_row, fields)
147             + n_fields * sizeof(struct ovsdb_datum)
148             + i * sizeof(struct hmap_node));
149 }
150
151 /* Returns the hmap_node in 'row' for the index numbered 'i'. */
152 static struct hmap_node *
153 ovsdb_row_get_index_node(struct ovsdb_row *row, size_t i)
154 {
155     return (void *) ((char *) row + ovsdb_row_index_offset__(row->table, i));
156 }
157
158 /* Returns the ovsdb_row given 'index_node', which is a pointer to that row's
159  * hmap_node for the index numbered 'i' within 'table'. */
160 static struct ovsdb_row *
161 ovsdb_row_from_index_node(struct hmap_node *index_node,
162                           const struct ovsdb_table *table, size_t i)
163 {
164     return (void *) ((char *) index_node - ovsdb_row_index_offset__(table, i));
165 }
166
167 void
168 ovsdb_txn_abort(struct ovsdb_txn *txn)
169 {
170     ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_abort));
171     ovsdb_txn_free(txn);
172 }
173
174 static struct ovsdb_txn_row *
175 find_txn_row(const struct ovsdb_table *table, const struct uuid *uuid)
176 {
177     struct ovsdb_txn_row *txn_row;
178
179     if (!table->txn_table) {
180         return NULL;
181     }
182
183     HMAP_FOR_EACH_WITH_HASH (txn_row, hmap_node,
184                              uuid_hash(uuid), &table->txn_table->txn_rows) {
185         if (uuid_equals(uuid, &txn_row->uuid)) {
186             return txn_row;
187         }
188     }
189
190     return NULL;
191 }
192
193 static struct ovsdb_txn_row *
194 find_or_make_txn_row(struct ovsdb_txn *txn, const struct ovsdb_table *table,
195                      const struct uuid *uuid)
196 {
197     struct ovsdb_txn_row *txn_row = find_txn_row(table, uuid);
198     if (!txn_row) {
199         const struct ovsdb_row *row = ovsdb_table_get_row(table, uuid);
200         if (row) {
201             txn_row = ovsdb_txn_row_modify(txn, row)->txn_row;
202         }
203     }
204     return txn_row;
205 }
206
207 static struct ovsdb_error * WARN_UNUSED_RESULT
208 ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
209                            const struct ovsdb_column *c,
210                            const struct ovsdb_base_type *base,
211                            const union ovsdb_atom *atoms, unsigned int n,
212                            int delta)
213 {
214     const struct ovsdb_table *table;
215     unsigned int i;
216
217     if (!ovsdb_base_type_is_strong_ref(base)) {
218         return NULL;
219     }
220
221     table = base->u.uuid.refTable;
222     for (i = 0; i < n; i++) {
223         const struct uuid *uuid = &atoms[i].uuid;
224         struct ovsdb_txn_row *txn_row;
225
226         if (uuid_equals(uuid, ovsdb_row_get_uuid(r))) {
227             /* Self-references don't count. */
228             continue;
229         }
230
231         txn_row = find_or_make_txn_row(txn, table, uuid);
232         if (!txn_row) {
233             return ovsdb_error("referential integrity violation",
234                                "Table %s column %s row "UUID_FMT" "
235                                "references nonexistent row "UUID_FMT" in "
236                                "table %s.",
237                                r->table->schema->name, c->name,
238                                UUID_ARGS(ovsdb_row_get_uuid(r)),
239                                UUID_ARGS(uuid), table->schema->name);
240         }
241         txn_row->n_refs += delta;
242     }
243
244     return NULL;
245 }
246
247 static struct ovsdb_error * WARN_UNUSED_RESULT
248 ovsdb_txn_adjust_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
249                           const struct ovsdb_column *column, int delta)
250 {
251     const struct ovsdb_datum *field = &r->fields[column->index];
252     struct ovsdb_error *error;
253
254     error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.key,
255                                        field->keys, field->n, delta);
256     if (!error) {
257         error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.value,
258                                            field->values, field->n, delta);
259     }
260     return error;
261 }
262
263 static struct ovsdb_error * WARN_UNUSED_RESULT
264 update_row_ref_count(struct ovsdb_txn *txn, struct ovsdb_txn_row *r)
265 {
266     struct ovsdb_table *table = r->table;
267     struct shash_node *node;
268
269     SHASH_FOR_EACH (node, &table->schema->columns) {
270         const struct ovsdb_column *column = node->data;
271         struct ovsdb_error *error;
272
273         if (r->old) {
274             error = ovsdb_txn_adjust_row_refs(txn, r->old, column, -1);
275             if (error) {
276                 return OVSDB_WRAP_BUG("error decreasing refcount", error);
277             }
278         }
279         if (r->new) {
280             error = ovsdb_txn_adjust_row_refs(txn, r->new, column, 1);
281             if (error) {
282                 return error;
283             }
284         }
285     }
286
287     return NULL;
288 }
289
290 static struct ovsdb_error * WARN_UNUSED_RESULT
291 check_ref_count(struct ovsdb_txn *txn OVS_UNUSED, struct ovsdb_txn_row *r)
292 {
293     if (r->new || !r->n_refs) {
294         return NULL;
295     } else {
296         return ovsdb_error("referential integrity violation",
297                            "cannot delete %s row "UUID_FMT" because "
298                            "of %"PRIuSIZE" remaining reference(s)",
299                            r->table->schema->name, UUID_ARGS(&r->uuid),
300                            r->n_refs);
301     }
302 }
303
304 static struct ovsdb_error * WARN_UNUSED_RESULT
305 delete_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *row,
306                 const struct ovsdb_base_type *base,
307                 const union ovsdb_atom *atoms, unsigned int n)
308 {
309     const struct ovsdb_table *table;
310     unsigned int i;
311
312     if (!ovsdb_base_type_is_strong_ref(base)) {
313         return NULL;
314     }
315
316     table = base->u.uuid.refTable;
317     for (i = 0; i < n; i++) {
318         const struct uuid *uuid = &atoms[i].uuid;
319         struct ovsdb_txn_row *txn_row;
320
321         if (uuid_equals(uuid, ovsdb_row_get_uuid(row))) {
322             /* Self-references don't count. */
323             continue;
324         }
325
326         txn_row = find_or_make_txn_row(txn, table, uuid);
327         if (!txn_row) {
328             return OVSDB_BUG("strong ref target missing");
329         } else if (!txn_row->n_refs) {
330             return OVSDB_BUG("strong ref target has zero n_refs");
331         } else if (!txn_row->new) {
332             return OVSDB_BUG("deleted strong ref target");
333         }
334
335         if (--txn_row->n_refs == 0) {
336             struct ovsdb_error *error = delete_garbage_row(txn, txn_row);
337             if (error) {
338                 return error;
339             }
340         }
341     }
342
343     return NULL;
344 }
345
346 static struct ovsdb_error * WARN_UNUSED_RESULT
347 delete_garbage_row(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
348 {
349     struct shash_node *node;
350     struct ovsdb_row *row;
351
352     if (txn_row->table->schema->is_root) {
353         return NULL;
354     }
355
356     row = txn_row->new;
357     txn_row->new = NULL;
358     hmap_remove(&txn_row->table->rows, &row->hmap_node);
359     SHASH_FOR_EACH (node, &txn_row->table->schema->columns) {
360         const struct ovsdb_column *column = node->data;
361         const struct ovsdb_datum *field = &row->fields[column->index];
362         struct ovsdb_error *error;
363
364         error = delete_row_refs(txn, row,
365                                 &column->type.key, field->keys, field->n);
366         if (error) {
367             return error;
368         }
369
370         error = delete_row_refs(txn, row,
371                                 &column->type.value, field->values, field->n);
372         if (error) {
373             return error;
374         }
375     }
376     ovsdb_row_destroy(row);
377
378     return NULL;
379 }
380
381 static struct ovsdb_error * WARN_UNUSED_RESULT
382 collect_garbage(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
383 {
384     if (txn_row->new && !txn_row->n_refs) {
385         return delete_garbage_row(txn, txn_row);
386     }
387     return NULL;
388 }
389
390 static struct ovsdb_error * WARN_UNUSED_RESULT
391 update_ref_counts(struct ovsdb_txn *txn)
392 {
393     struct ovsdb_error *error;
394
395     error = for_each_txn_row(txn, update_row_ref_count);
396     if (error) {
397         return error;
398     }
399
400     return for_each_txn_row(txn, check_ref_count);
401 }
402
403 static struct ovsdb_error *
404 ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
405                      struct ovsdb_txn_row *txn_row)
406 {
407     size_t n_indexes = txn_row->table->schema->n_indexes;
408
409     if (txn_row->old) {
410         size_t i;
411
412         for (i = 0; i < n_indexes; i++) {
413             struct hmap_node *node = ovsdb_row_get_index_node(txn_row->old, i);
414             hmap_remove(&txn_row->table->indexes[i], node);
415         }
416     }
417     if (txn_row->new) {
418         size_t i;
419
420         for (i = 0; i < n_indexes; i++) {
421             struct hmap_node *node = ovsdb_row_get_index_node(txn_row->new, i);
422             hmap_insert(&txn_row->table->indexes[i], node, node->hash);
423         }
424     }
425
426     ovsdb_txn_row_prefree(txn_row);
427     if (txn_row->new) {
428         txn_row->new->n_refs = txn_row->n_refs;
429     }
430     ovsdb_row_destroy(txn_row->old);
431     free(txn_row);
432
433     return NULL;
434 }
435
436 static void
437 add_weak_ref(struct ovsdb_txn *txn,
438              const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
439 {
440     struct ovsdb_row *src = CONST_CAST(struct ovsdb_row *, src_);
441     struct ovsdb_row *dst = CONST_CAST(struct ovsdb_row *, dst_);
442     struct ovsdb_weak_ref *weak;
443
444     if (src == dst) {
445         return;
446     }
447
448     dst = ovsdb_txn_row_modify(txn, dst);
449
450     if (!list_is_empty(&dst->dst_refs)) {
451         /* Omit duplicates. */
452         weak = CONTAINER_OF(list_back(&dst->dst_refs),
453                             struct ovsdb_weak_ref, dst_node);
454         if (weak->src == src) {
455             return;
456         }
457     }
458
459     weak = xmalloc(sizeof *weak);
460     weak->src = src;
461     list_push_back(&dst->dst_refs, &weak->dst_node);
462     list_push_back(&src->src_refs, &weak->src_node);
463 }
464
465 static struct ovsdb_error * WARN_UNUSED_RESULT
466 assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
467 {
468     struct ovsdb_table *table;
469     struct shash_node *node;
470
471     if (txn_row->old) {
472         /* Mark rows that have weak references to 'txn_row' as modified, so
473          * that their weak references will get reassessed. */
474         struct ovsdb_weak_ref *weak, *next;
475
476         LIST_FOR_EACH_SAFE (weak, next, dst_node, &txn_row->old->dst_refs) {
477             if (!weak->src->txn_row) {
478                 ovsdb_txn_row_modify(txn, weak->src);
479             }
480         }
481     }
482
483     if (!txn_row->new) {
484         /* We don't have to do anything about references that originate at
485          * 'txn_row', because ovsdb_row_destroy() will remove those weak
486          * references. */
487         return NULL;
488     }
489
490     table = txn_row->table;
491     SHASH_FOR_EACH (node, &table->schema->columns) {
492         const struct ovsdb_column *column = node->data;
493         struct ovsdb_datum *datum = &txn_row->new->fields[column->index];
494         unsigned int orig_n, i;
495         bool zero = false;
496
497         orig_n = datum->n;
498
499         if (ovsdb_base_type_is_weak_ref(&column->type.key)) {
500             for (i = 0; i < datum->n; ) {
501                 const struct ovsdb_row *row;
502
503                 row = ovsdb_table_get_row(column->type.key.u.uuid.refTable,
504                                           &datum->keys[i].uuid);
505                 if (row) {
506                     add_weak_ref(txn, txn_row->new, row);
507                     i++;
508                 } else {
509                     if (uuid_is_zero(&datum->keys[i].uuid)) {
510                         zero = true;
511                     }
512                     ovsdb_datum_remove_unsafe(datum, i, &column->type);
513                 }
514             }
515         }
516
517         if (ovsdb_base_type_is_weak_ref(&column->type.value)) {
518             for (i = 0; i < datum->n; ) {
519                 const struct ovsdb_row *row;
520
521                 row = ovsdb_table_get_row(column->type.value.u.uuid.refTable,
522                                           &datum->values[i].uuid);
523                 if (row) {
524                     add_weak_ref(txn, txn_row->new, row);
525                     i++;
526                 } else {
527                     if (uuid_is_zero(&datum->values[i].uuid)) {
528                         zero = true;
529                     }
530                     ovsdb_datum_remove_unsafe(datum, i, &column->type);
531                 }
532             }
533         }
534
535         if (datum->n != orig_n) {
536             bitmap_set1(txn_row->changed, OVSDB_COL_VERSION);
537             bitmap_set1(txn_row->changed, column->index);
538             ovsdb_datum_sort_assert(datum, column->type.key.type);
539             if (datum->n < column->type.n_min) {
540                 const struct uuid *row_uuid = ovsdb_row_get_uuid(txn_row->new);
541                 if (zero && !txn_row->old) {
542                     return ovsdb_error(
543                         "constraint violation",
544                         "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
545                         " (inserted within this transaction) contained "
546                         "all-zeros UUID (probably as the default value for "
547                         "this column) but deleting this value caused a "
548                         "constraint volation because this column is not "
549                         "allowed to be empty.", column->name,
550                         table->schema->name, UUID_ARGS(row_uuid));
551                 } else {
552                     return ovsdb_error(
553                         "constraint violation",
554                         "Deletion of %u weak reference(s) to deleted (or "
555                         "never-existing) rows from column \"%s\" in \"%s\" "
556                         "row "UUID_FMT" %scaused this column to become empty, "
557                         "but constraints on this column disallow an "
558                         "empty column.",
559                         orig_n - datum->n, column->name, table->schema->name,
560                         UUID_ARGS(row_uuid),
561                         (txn_row->old
562                          ? ""
563                          : "(inserted within this transaction) "));
564                 }
565             }
566         }
567     }
568
569     return NULL;
570 }
571
572 static struct ovsdb_error * WARN_UNUSED_RESULT
573 determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
574 {
575     struct ovsdb_table *table = txn_row->table;
576
577     if (txn_row->old && txn_row->new) {
578         struct shash_node *node;
579         bool changed = false;
580
581         SHASH_FOR_EACH (node, &table->schema->columns) {
582             const struct ovsdb_column *column = node->data;
583             const struct ovsdb_type *type = &column->type;
584             unsigned int idx = column->index;
585
586             if (!ovsdb_datum_equals(&txn_row->old->fields[idx],
587                                     &txn_row->new->fields[idx],
588                                     type)) {
589                 bitmap_set1(txn_row->changed, idx);
590                 changed = true;
591             }
592         }
593
594         if (!changed) {
595             /* Nothing actually changed in this row, so drop it. */
596             ovsdb_txn_row_abort(txn, txn_row);
597         }
598     } else {
599         bitmap_set_multiple(txn_row->changed, 0,
600                             shash_count(&table->schema->columns), 1);
601     }
602
603     return NULL;
604 }
605
606 static struct ovsdb_error * WARN_UNUSED_RESULT
607 check_max_rows(struct ovsdb_txn *txn)
608 {
609     struct ovsdb_txn_table *t;
610
611     LIST_FOR_EACH (t, node, &txn->txn_tables) {
612         size_t n_rows = hmap_count(&t->table->rows);
613         unsigned int max_rows = t->table->schema->max_rows;
614
615         if (n_rows > max_rows) {
616             return ovsdb_error("constraint violation",
617                                "transaction causes \"%s\" table to contain "
618                                "%"PRIuSIZE" rows, greater than the schema-defined "
619                                "limit of %u row(s)",
620                                t->table->schema->name, n_rows, max_rows);
621         }
622     }
623
624     return NULL;
625 }
626
627 static struct ovsdb_row *
628 ovsdb_index_search(struct hmap *index, struct ovsdb_row *row, size_t i,
629                    uint32_t hash)
630 {
631     const struct ovsdb_table *table = row->table;
632     const struct ovsdb_column_set *columns = &table->schema->indexes[i];
633     struct hmap_node *node;
634
635     for (node = hmap_first_with_hash(index, hash); node;
636          node = hmap_next_with_hash(node)) {
637         struct ovsdb_row *irow = ovsdb_row_from_index_node(node, table, i);
638         if (ovsdb_row_equal_columns(row, irow, columns)) {
639             return irow;
640         }
641     }
642
643     return NULL;
644 }
645
646 static void
647 duplicate_index_row__(const struct ovsdb_column_set *index,
648                       const struct ovsdb_row *row,
649                       const char *title,
650                       struct ds *out)
651 {
652     size_t n_columns = shash_count(&row->table->schema->columns);
653
654     ds_put_format(out, "%s row, with UUID "UUID_FMT", ",
655                   title, UUID_ARGS(ovsdb_row_get_uuid(row)));
656     if (!row->txn_row
657         || bitmap_scan(row->txn_row->changed, 1, 0, n_columns) == n_columns) {
658         ds_put_cstr(out, "existed in the database before this "
659                     "transaction and was not modified by the transaction.");
660     } else if (!row->txn_row->old) {
661         ds_put_cstr(out, "was inserted by this transaction.");
662     } else if (ovsdb_row_equal_columns(row->txn_row->old,
663                                        row->txn_row->new, index)) {
664         ds_put_cstr(out, "existed in the database before this "
665                     "transaction, which modified some of the row's columns "
666                     "but not any columns in this index.");
667     } else {
668         ds_put_cstr(out, "had the following index values before the "
669                     "transaction: ");
670         ovsdb_row_columns_to_string(row->txn_row->old, index, out);
671         ds_put_char(out, '.');
672     }
673 }
674
675 static struct ovsdb_error * WARN_UNUSED_RESULT
676 duplicate_index_row(const struct ovsdb_column_set *index,
677                     const struct ovsdb_row *a,
678                     const struct ovsdb_row *b)
679 {
680     struct ovsdb_column_set all_columns;
681     struct ovsdb_error *error;
682     char *index_s;
683     struct ds s;
684
685     /* Put 'a' and 'b' in a predictable order to make error messages
686      * reproducible for testing. */
687     ovsdb_column_set_init(&all_columns);
688     ovsdb_column_set_add_all(&all_columns, a->table);
689     if (ovsdb_row_compare_columns_3way(a, b, &all_columns) < 0) {
690         const struct ovsdb_row *tmp = a;
691         a = b;
692         b = tmp;
693     }
694     ovsdb_column_set_destroy(&all_columns);
695
696     index_s = ovsdb_column_set_to_string(index);
697
698     ds_init(&s);
699     ds_put_format(&s, "Transaction causes multiple rows in \"%s\" table to "
700                   "have identical values (", a->table->schema->name);
701     ovsdb_row_columns_to_string(a, index, &s);
702     ds_put_format(&s, ") for index on %s.  ", index_s);
703     duplicate_index_row__(index, a, "First", &s);
704     ds_put_cstr(&s, "  ");
705     duplicate_index_row__(index, b, "Second", &s);
706
707     free(index_s);
708
709     error = ovsdb_error("constraint violation", "%s", ds_cstr(&s));
710     ds_destroy(&s);
711     return error;
712 }
713
714 static struct ovsdb_error * WARN_UNUSED_RESULT
715 check_index_uniqueness(struct ovsdb_txn *txn OVS_UNUSED,
716                        struct ovsdb_txn_row *txn_row)
717 {
718     struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
719     struct ovsdb_table *table = txn_row->table;
720     struct ovsdb_row *row = txn_row->new;
721     size_t i;
722
723     if (!row) {
724         return NULL;
725     }
726
727     for (i = 0; i < table->schema->n_indexes; i++) {
728         const struct ovsdb_column_set *index = &table->schema->indexes[i];
729         struct ovsdb_row *irow;
730         uint32_t hash;
731
732         hash = ovsdb_row_hash_columns(row, index, 0);
733         irow = ovsdb_index_search(&txn_table->txn_indexes[i], row, i, hash);
734         if (irow) {
735             return duplicate_index_row(index, irow, row);
736         }
737
738         irow = ovsdb_index_search(&table->indexes[i], row, i, hash);
739         if (irow && !irow->txn_row) {
740             return duplicate_index_row(index, irow, row);
741         }
742
743         hmap_insert(&txn_table->txn_indexes[i],
744                     ovsdb_row_get_index_node(row, i), hash);
745     }
746
747     return NULL;
748 }
749
750 struct ovsdb_error *
751 ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
752 {
753     struct ovsdb_replica *replica;
754     struct ovsdb_error *error;
755
756     /* Figure out what actually changed, and abort early if the transaction
757      * was really a no-op. */
758     error = for_each_txn_row(txn, determine_changes);
759     if (error) {
760         return OVSDB_WRAP_BUG("can't happen", error);
761     }
762     if (list_is_empty(&txn->txn_tables)) {
763         ovsdb_txn_abort(txn);
764         return NULL;
765     }
766
767     /* Update reference counts and check referential integrity. */
768     error = update_ref_counts(txn);
769     if (error) {
770         ovsdb_txn_abort(txn);
771         return error;
772     }
773
774     /* Delete unreferenced, non-root rows. */
775     error = for_each_txn_row(txn, collect_garbage);
776     if (error) {
777         ovsdb_txn_abort(txn);
778         return OVSDB_WRAP_BUG("can't happen", error);
779     }
780
781     /* Check maximum rows table constraints. */
782     error = check_max_rows(txn);
783     if (error) {
784         ovsdb_txn_abort(txn);
785         return error;
786     }
787
788     /* Check reference counts and remove bad references for "weak" referential
789      * integrity. */
790     error = for_each_txn_row(txn, assess_weak_refs);
791     if (error) {
792         ovsdb_txn_abort(txn);
793         return error;
794     }
795
796     /* Verify that the indexes will still be unique post-transaction. */
797     error = for_each_txn_row(txn, check_index_uniqueness);
798     if (error) {
799         ovsdb_txn_abort(txn);
800         return error;
801     }
802
803     /* Send the commit to each replica. */
804     LIST_FOR_EACH (replica, node, &txn->db->replicas) {
805         error = (replica->class->commit)(replica, txn, durable);
806         if (error) {
807             /* We don't support two-phase commit so only the first replica is
808              * allowed to report an error. */
809             ovs_assert(&replica->node == txn->db->replicas.next);
810
811             ovsdb_txn_abort(txn);
812             return error;
813         }
814     }
815
816     /* Finalize commit. */
817     txn->db->run_triggers = true;
818     ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_commit));
819     ovsdb_txn_free(txn);
820
821     return NULL;
822 }
823
824 void
825 ovsdb_txn_for_each_change(const struct ovsdb_txn *txn,
826                           ovsdb_txn_row_cb_func *cb, void *aux)
827 {
828     struct ovsdb_txn_table *t;
829     struct ovsdb_txn_row *r;
830
831     LIST_FOR_EACH (t, node, &txn->txn_tables) {
832         HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
833             if ((r->old || r->new) && !cb(r->old, r->new, r->changed, aux)) {
834                 break;
835             }
836         }
837    }
838 }
839
840 static struct ovsdb_txn_table *
841 ovsdb_txn_create_txn_table(struct ovsdb_txn *txn, struct ovsdb_table *table)
842 {
843     if (!table->txn_table) {
844         struct ovsdb_txn_table *txn_table;
845         size_t i;
846
847         table->txn_table = txn_table = xmalloc(sizeof *table->txn_table);
848         txn_table->table = table;
849         hmap_init(&txn_table->txn_rows);
850         txn_table->serial = serial - 1;
851         txn_table->txn_indexes = xmalloc(table->schema->n_indexes
852                                          * sizeof *txn_table->txn_indexes);
853         for (i = 0; i < table->schema->n_indexes; i++) {
854             hmap_init(&txn_table->txn_indexes[i]);
855         }
856         list_push_back(&txn->txn_tables, &txn_table->node);
857     }
858     return table->txn_table;
859 }
860
861 static struct ovsdb_txn_row *
862 ovsdb_txn_row_create(struct ovsdb_txn *txn, struct ovsdb_table *table,
863                      const struct ovsdb_row *old_, struct ovsdb_row *new)
864 {
865     const struct ovsdb_row *row = old_ ? old_ : new;
866     struct ovsdb_row *old = CONST_CAST(struct ovsdb_row *, old_);
867     size_t n_columns = shash_count(&table->schema->columns);
868     struct ovsdb_txn_table *txn_table;
869     struct ovsdb_txn_row *txn_row;
870
871     txn_row = xzalloc(offsetof(struct ovsdb_txn_row, changed)
872                       + bitmap_n_bytes(n_columns));
873     txn_row->uuid = *ovsdb_row_get_uuid(row);
874     txn_row->table = row->table;
875     txn_row->old = old;
876     txn_row->new = new;
877     txn_row->n_refs = old ? old->n_refs : 0;
878     txn_row->serial = serial - 1;
879
880     if (old) {
881         old->txn_row = txn_row;
882     }
883     if (new) {
884         new->txn_row = txn_row;
885     }
886
887     txn_table = ovsdb_txn_create_txn_table(txn, table);
888     hmap_insert(&txn_table->txn_rows, &txn_row->hmap_node,
889                 ovsdb_row_hash(old ? old : new));
890
891     return txn_row;
892 }
893
894 struct ovsdb_row *
895 ovsdb_txn_row_modify(struct ovsdb_txn *txn, const struct ovsdb_row *ro_row_)
896 {
897     struct ovsdb_row *ro_row = CONST_CAST(struct ovsdb_row *, ro_row_);
898
899     if (ro_row->txn_row) {
900         ovs_assert(ro_row == ro_row->txn_row->new);
901         return ro_row;
902     } else {
903         struct ovsdb_table *table = ro_row->table;
904         struct ovsdb_row *rw_row;
905
906         rw_row = ovsdb_row_clone(ro_row);
907         rw_row->n_refs = ro_row->n_refs;
908         uuid_generate(ovsdb_row_get_version_rw(rw_row));
909         ovsdb_txn_row_create(txn, table, ro_row, rw_row);
910         hmap_replace(&table->rows, &ro_row->hmap_node, &rw_row->hmap_node);
911
912         return rw_row;
913     }
914 }
915
916 void
917 ovsdb_txn_row_insert(struct ovsdb_txn *txn, struct ovsdb_row *row)
918 {
919     uint32_t hash = ovsdb_row_hash(row);
920     struct ovsdb_table *table = row->table;
921
922     uuid_generate(ovsdb_row_get_version_rw(row));
923
924     ovsdb_txn_row_create(txn, table, NULL, row);
925     hmap_insert(&table->rows, &row->hmap_node, hash);
926 }
927
928 /* 'row' must be assumed destroyed upon return; the caller must not reference
929  * it again. */
930 void
931 ovsdb_txn_row_delete(struct ovsdb_txn *txn, const struct ovsdb_row *row_)
932 {
933     struct ovsdb_row *row = CONST_CAST(struct ovsdb_row *, row_);
934     struct ovsdb_table *table = row->table;
935     struct ovsdb_txn_row *txn_row = row->txn_row;
936
937     hmap_remove(&table->rows, &row->hmap_node);
938
939     if (!txn_row) {
940         ovsdb_txn_row_create(txn, table, row, NULL);
941     } else {
942         ovs_assert(txn_row->new == row);
943         if (txn_row->old) {
944             txn_row->new = NULL;
945         } else {
946             hmap_remove(&table->txn_table->txn_rows, &txn_row->hmap_node);
947             free(txn_row);
948         }
949         ovsdb_row_destroy(row);
950     }
951 }
952
953 void
954 ovsdb_txn_add_comment(struct ovsdb_txn *txn, const char *s)
955 {
956     if (txn->comment.length) {
957         ds_put_char(&txn->comment, '\n');
958     }
959     ds_put_cstr(&txn->comment, s);
960 }
961
962 const char *
963 ovsdb_txn_get_comment(const struct ovsdb_txn *txn)
964 {
965     return txn->comment.length ? ds_cstr_ro(&txn->comment) : NULL;
966 }
967 \f
968 static void
969 ovsdb_txn_row_prefree(struct ovsdb_txn_row *txn_row)
970 {
971     struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
972
973     txn_table->n_processed--;
974     hmap_remove(&txn_table->txn_rows, &txn_row->hmap_node);
975
976     if (txn_row->old) {
977         txn_row->old->txn_row = NULL;
978     }
979     if (txn_row->new) {
980         txn_row->new->txn_row = NULL;
981     }
982 }
983
984 static void
985 ovsdb_txn_table_destroy(struct ovsdb_txn_table *txn_table)
986 {
987     size_t i;
988
989     ovs_assert(hmap_is_empty(&txn_table->txn_rows));
990
991     for (i = 0; i < txn_table->table->schema->n_indexes; i++) {
992         hmap_destroy(&txn_table->txn_indexes[i]);
993     }
994     free(txn_table->txn_indexes);
995
996     txn_table->table->txn_table = NULL;
997     hmap_destroy(&txn_table->txn_rows);
998     list_remove(&txn_table->node);
999     free(txn_table);
1000 }
1001
1002 /* Calls 'cb' for every txn_row within 'txn'.  If 'cb' returns nonnull, this
1003  * aborts the iteration and for_each_txn_row() passes the error up.  Otherwise,
1004  * returns a null pointer after iteration is complete.
1005  *
1006  * 'cb' may insert new txn_rows and new txn_tables into 'txn'.  It may delete
1007  * the txn_row that it is passed in, or txn_rows in txn_tables other than the
1008  * one passed to 'cb'.  It may *not* delete txn_rows other than the one passed
1009  * in within the same txn_table.  It may *not* delete any txn_tables.  As long
1010  * as these rules are followed, 'cb' will be called exactly once for each
1011  * txn_row in 'txn', even those added by 'cb'.
1012  *
1013  * (Even though 'cb' is not allowed to delete some txn_rows, it can still
1014  * delete any actual row by clearing a txn_row's 'new' member.)
1015  */
1016 static struct ovsdb_error * WARN_UNUSED_RESULT
1017 for_each_txn_row(struct ovsdb_txn *txn,
1018                  struct ovsdb_error *(*cb)(struct ovsdb_txn *,
1019                                            struct ovsdb_txn_row *))
1020 {
1021     bool any_work;
1022
1023     serial++;
1024
1025     do {
1026         struct ovsdb_txn_table *t, *next_txn_table;
1027
1028         any_work = false;
1029         LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
1030             if (t->serial != serial) {
1031                 t->serial = serial;
1032                 t->n_processed = 0;
1033             }
1034
1035             while (t->n_processed < hmap_count(&t->txn_rows)) {
1036                 struct ovsdb_txn_row *r, *next_txn_row;
1037
1038                 HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
1039                     if (r->serial != serial) {
1040                         struct ovsdb_error *error;
1041
1042                         r->serial = serial;
1043                         t->n_processed++;
1044                         any_work = true;
1045
1046                         error = cb(txn, r);
1047                         if (error) {
1048                             return error;
1049                         }
1050                     }
1051                 }
1052             }
1053             if (hmap_is_empty(&t->txn_rows)) {
1054                 /* Table is empty.  Drop it. */
1055                 ovsdb_txn_table_destroy(t);
1056             }
1057         }
1058     } while (any_work);
1059
1060     return NULL;
1061 }