c07541ed3a049579d4db1325a98f48d8a77120d5
[sliver-openvswitch.git] / ovsdb / transaction.c
1 /* Copyright (c) 2009, 2010, 2011 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "transaction.h"
19
20 #include <assert.h>
21
22 #include "bitmap.h"
23 #include "dynamic-string.h"
24 #include "hash.h"
25 #include "hmap.h"
26 #include "json.h"
27 #include "list.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb.h"
30 #include "row.h"
31 #include "table.h"
32 #include "uuid.h"
33
34 struct ovsdb_txn {
35     struct ovsdb *db;
36     struct list txn_tables;     /* Contains "struct ovsdb_txn_table"s. */
37     struct ds comment;
38 };
39
40 /* A table modified by a transaction. */
41 struct ovsdb_txn_table {
42     struct list node;           /* Element in ovsdb_txn's txn_tables list. */
43     struct ovsdb_table *table;
44     struct hmap txn_rows;       /* Contains "struct ovsdb_txn_row"s. */
45
46     /* Used by for_each_txn_row(). */
47     unsigned int serial;        /* Serial number of in-progress iteration. */
48     unsigned int n_processed;   /* Number of rows processed. */
49 };
50
51 /* A row modified by the transaction:
52  *
53  *      - A row added by a transaction will have null 'old' and non-null 'new'.
54  *
55  *      - A row deleted by a transaction will have non-null 'old' and null
56  *        'new'.
57  *
58  *      - A row modified by a transaction will have non-null 'old' and 'new'.
59  *
60  *      - 'old' and 'new' both null indicates that a row was added then deleted
61  *        within a single transaction.  Most of the time we instead delete the
62  *        ovsdb_txn_row entirely, but inside a for_each_txn_row() callback
63  *        there are restrictions that sometimes mean we have to leave the
64  *        ovsdb_txn_row in place.
65  */
66 struct ovsdb_txn_row {
67     struct hmap_node hmap_node; /* In ovsdb_txn_table's txn_rows hmap. */
68     struct ovsdb_row *old;      /* The old row. */
69     struct ovsdb_row *new;      /* The new row. */
70     size_t n_refs;              /* Number of remaining references. */
71
72     /* These members are the same as the corresponding members of 'old' or
73      * 'new'.  They are present here for convenience and because occasionally
74      * there can be an ovsdb_txn_row where both 'old' and 'new' are NULL. */
75     struct uuid uuid;
76     struct ovsdb_table *table;
77
78     /* Used by for_each_txn_row(). */
79     unsigned int serial;        /* Serial number of in-progress commit. */
80
81     unsigned long changed[];    /* Bits set to 1 for columns that changed. */
82 };
83
84 static struct ovsdb_error * WARN_UNUSED_RESULT
85 delete_garbage_row(struct ovsdb_txn *txn, struct ovsdb_txn_row *r);
86 static void ovsdb_txn_row_prefree(struct ovsdb_txn_row *);
87 static struct ovsdb_error * WARN_UNUSED_RESULT
88 for_each_txn_row(struct ovsdb_txn *txn,
89                       struct ovsdb_error *(*)(struct ovsdb_txn *,
90                                               struct ovsdb_txn_row *));
91
92 /* Used by for_each_txn_row() to track tables and rows that have been
93  * processed.  */
94 static unsigned int serial;
95
96 struct ovsdb_txn *
97 ovsdb_txn_create(struct ovsdb *db)
98 {
99     struct ovsdb_txn *txn = xmalloc(sizeof *txn);
100     txn->db = db;
101     list_init(&txn->txn_tables);
102     ds_init(&txn->comment);
103     return txn;
104 }
105
106 static void
107 ovsdb_txn_free(struct ovsdb_txn *txn)
108 {
109     assert(list_is_empty(&txn->txn_tables));
110     ds_destroy(&txn->comment);
111     free(txn);
112 }
113
114 static struct ovsdb_error *
115 ovsdb_txn_row_abort(struct ovsdb_txn *txn OVS_UNUSED,
116                     struct ovsdb_txn_row *txn_row)
117 {
118     struct ovsdb_row *old = txn_row->old;
119     struct ovsdb_row *new = txn_row->new;
120
121     ovsdb_txn_row_prefree(txn_row);
122     if (!old) {
123         if (new) {
124             hmap_remove(&new->table->rows, &new->hmap_node);
125         }
126     } else if (!new) {
127         hmap_insert(&old->table->rows, &old->hmap_node, ovsdb_row_hash(old));
128     } else {
129         hmap_replace(&new->table->rows, &new->hmap_node, &old->hmap_node);
130     }
131     ovsdb_row_destroy(new);
132     free(txn_row);
133
134     return NULL;
135 }
136
137 void
138 ovsdb_txn_abort(struct ovsdb_txn *txn)
139 {
140     ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_abort));
141     ovsdb_txn_free(txn);
142 }
143
144 static struct ovsdb_txn_row *
145 find_txn_row(const struct ovsdb_table *table, const struct uuid *uuid)
146 {
147     struct ovsdb_txn_row *txn_row;
148
149     if (!table->txn_table) {
150         return NULL;
151     }
152
153     HMAP_FOR_EACH_WITH_HASH (txn_row, hmap_node,
154                              uuid_hash(uuid), &table->txn_table->txn_rows) {
155         if (uuid_equals(uuid, &txn_row->uuid)) {
156             return txn_row;
157         }
158     }
159
160     return NULL;
161 }
162
163 static struct ovsdb_txn_row *
164 find_or_make_txn_row(struct ovsdb_txn *txn, const struct ovsdb_table *table,
165                      const struct uuid *uuid)
166 {
167     struct ovsdb_txn_row *txn_row = find_txn_row(table, uuid);
168     if (!txn_row) {
169         const struct ovsdb_row *row = ovsdb_table_get_row(table, uuid);
170         if (row) {
171             txn_row = ovsdb_txn_row_modify(txn, row)->txn_row;
172         }
173     }
174     return txn_row;
175 }
176
177 static struct ovsdb_error * WARN_UNUSED_RESULT
178 ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
179                            const struct ovsdb_column *c,
180                            const struct ovsdb_base_type *base,
181                            const union ovsdb_atom *atoms, unsigned int n,
182                            int delta)
183 {
184     const struct ovsdb_table *table;
185     unsigned int i;
186
187     if (!ovsdb_base_type_is_strong_ref(base)) {
188         return NULL;
189     }
190
191     table = base->u.uuid.refTable;
192     for (i = 0; i < n; i++) {
193         const struct uuid *uuid = &atoms[i].uuid;
194         struct ovsdb_txn_row *txn_row;
195
196         if (uuid_equals(uuid, ovsdb_row_get_uuid(r))) {
197             /* Self-references don't count. */
198             continue;
199         }
200
201         txn_row = find_or_make_txn_row(txn, table, uuid);
202         if (!txn_row) {
203             return ovsdb_error("referential integrity violation",
204                                "Table %s column %s row "UUID_FMT" "
205                                "references nonexistent row "UUID_FMT" in "
206                                "table %s.",
207                                r->table->schema->name, c->name,
208                                UUID_ARGS(ovsdb_row_get_uuid(r)),
209                                UUID_ARGS(uuid), table->schema->name);
210         }
211         txn_row->n_refs += delta;
212     }
213
214     return NULL;
215 }
216
217 static struct ovsdb_error * WARN_UNUSED_RESULT
218 ovsdb_txn_adjust_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
219                           const struct ovsdb_column *column, int delta)
220 {
221     const struct ovsdb_datum *field = &r->fields[column->index];
222     struct ovsdb_error *error;
223
224     error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.key,
225                                        field->keys, field->n, delta);
226     if (!error) {
227         error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.value,
228                                            field->values, field->n, delta);
229     }
230     return error;
231 }
232
233 static struct ovsdb_error * WARN_UNUSED_RESULT
234 update_row_ref_count(struct ovsdb_txn *txn, struct ovsdb_txn_row *r)
235 {
236     struct ovsdb_table *table = r->table;
237     struct shash_node *node;
238
239     SHASH_FOR_EACH (node, &table->schema->columns) {
240         const struct ovsdb_column *column = node->data;
241         struct ovsdb_error *error;
242
243         if (r->old) {
244             error = ovsdb_txn_adjust_row_refs(txn, r->old, column, -1);
245             if (error) {
246                 return OVSDB_WRAP_BUG("error decreasing refcount", error);
247             }
248         }
249         if (r->new) {
250             error = ovsdb_txn_adjust_row_refs(txn, r->new, column, 1);
251             if (error) {
252                 return error;
253             }
254         }
255     }
256
257     return NULL;
258 }
259
260 static struct ovsdb_error * WARN_UNUSED_RESULT
261 check_ref_count(struct ovsdb_txn *txn OVS_UNUSED, struct ovsdb_txn_row *r)
262 {
263     if (r->new || !r->n_refs) {
264         return NULL;
265     } else {
266         return ovsdb_error("referential integrity violation",
267                            "cannot delete %s row "UUID_FMT" because "
268                            "of %zu remaining reference(s)",
269                            r->table->schema->name, UUID_ARGS(&r->uuid),
270                            r->n_refs);
271     }
272 }
273
274 static struct ovsdb_error * WARN_UNUSED_RESULT
275 delete_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *row,
276                 const struct ovsdb_base_type *base,
277                 const union ovsdb_atom *atoms, unsigned int n)
278 {
279     const struct ovsdb_table *table;
280     unsigned int i;
281
282     if (!ovsdb_base_type_is_strong_ref(base)) {
283         return NULL;
284     }
285
286     table = base->u.uuid.refTable;
287     for (i = 0; i < n; i++) {
288         const struct uuid *uuid = &atoms[i].uuid;
289         struct ovsdb_txn_row *txn_row;
290
291         if (uuid_equals(uuid, ovsdb_row_get_uuid(row))) {
292             /* Self-references don't count. */
293             continue;
294         }
295
296         txn_row = find_or_make_txn_row(txn, table, uuid);
297         if (!txn_row) {
298             return OVSDB_BUG("strong ref target missing");
299         } else if (!txn_row->n_refs) {
300             return OVSDB_BUG("strong ref target has zero n_refs");
301         } else if (!txn_row->new) {
302             return OVSDB_BUG("deleted strong ref target");
303         }
304
305         if (--txn_row->n_refs == 0) {
306             struct ovsdb_error *error = delete_garbage_row(txn, txn_row);
307             if (error) {
308                 return error;
309             }
310         }
311     }
312
313     return NULL;
314 }
315
316 static struct ovsdb_error * WARN_UNUSED_RESULT
317 delete_garbage_row(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
318 {
319     struct shash_node *node;
320     struct ovsdb_row *row;
321
322     if (txn_row->table->schema->is_root) {
323         return NULL;
324     }
325
326     row = txn_row->new;
327     txn_row->new = NULL;
328     hmap_remove(&txn_row->table->rows, &row->hmap_node);
329     SHASH_FOR_EACH (node, &txn_row->table->schema->columns) {
330         const struct ovsdb_column *column = node->data;
331         const struct ovsdb_datum *field = &row->fields[column->index];
332         struct ovsdb_error *error;
333
334         error = delete_row_refs(txn, row,
335                                 &column->type.key, field->keys, field->n);
336         if (error) {
337             return error;
338         }
339
340         error = delete_row_refs(txn, row,
341                                 &column->type.value, field->values, field->n);
342         if (error) {
343             return error;
344         }
345     }
346     ovsdb_row_destroy(row);
347
348     return NULL;
349 }
350
351 static struct ovsdb_error * WARN_UNUSED_RESULT
352 collect_garbage(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
353 {
354     if (txn_row->new && !txn_row->n_refs) {
355         return delete_garbage_row(txn, txn_row);
356     }
357     return NULL;
358 }
359
360 static struct ovsdb_error * WARN_UNUSED_RESULT
361 update_ref_counts(struct ovsdb_txn *txn)
362 {
363     struct ovsdb_error *error;
364
365     error = for_each_txn_row(txn, update_row_ref_count);
366     if (error) {
367         return error;
368     }
369
370     return for_each_txn_row(txn, check_ref_count);
371 }
372
373 static struct ovsdb_error *
374 ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
375                      struct ovsdb_txn_row *txn_row)
376 {
377     ovsdb_txn_row_prefree(txn_row);
378     if (txn_row->new) {
379         txn_row->new->n_refs = txn_row->n_refs;
380     }
381     ovsdb_row_destroy(txn_row->old);
382     free(txn_row);
383
384     return NULL;
385 }
386
387 static void
388 add_weak_ref(struct ovsdb_txn *txn,
389              const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
390 {
391     struct ovsdb_row *src = (struct ovsdb_row *) src_;
392     struct ovsdb_row *dst = (struct ovsdb_row *) dst_;
393     struct ovsdb_weak_ref *weak;
394
395     if (src == dst) {
396         return;
397     }
398
399     dst = ovsdb_txn_row_modify(txn, dst);
400
401     if (!list_is_empty(&dst->dst_refs)) {
402         /* Omit duplicates. */
403         weak = CONTAINER_OF(list_back(&dst->dst_refs),
404                             struct ovsdb_weak_ref, dst_node);
405         if (weak->src == src) {
406             return;
407         }
408     }
409
410     weak = xmalloc(sizeof *weak);
411     weak->src = src;
412     list_push_back(&dst->dst_refs, &weak->dst_node);
413     list_push_back(&src->src_refs, &weak->src_node);
414 }
415
416 static struct ovsdb_error * WARN_UNUSED_RESULT
417 assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
418 {
419     struct ovsdb_table *table;
420     struct shash_node *node;
421
422     if (txn_row->old) {
423         /* Mark rows that have weak references to 'txn_row' as modified, so
424          * that their weak references will get reassessed. */
425         struct ovsdb_weak_ref *weak, *next;
426
427         LIST_FOR_EACH_SAFE (weak, next, dst_node, &txn_row->old->dst_refs) {
428             if (!weak->src->txn_row) {
429                 ovsdb_txn_row_modify(txn, weak->src);
430             }
431         }
432     }
433
434     if (!txn_row->new) {
435         /* We don't have to do anything about references that originate at
436          * 'txn_row', because ovsdb_row_destroy() will remove those weak
437          * references. */
438         return NULL;
439     }
440
441     table = txn_row->table;
442     SHASH_FOR_EACH (node, &table->schema->columns) {
443         const struct ovsdb_column *column = node->data;
444         struct ovsdb_datum *datum = &txn_row->new->fields[column->index];
445         unsigned int orig_n, i;
446         bool zero = false;
447
448         orig_n = datum->n;
449
450         if (ovsdb_base_type_is_weak_ref(&column->type.key)) {
451             for (i = 0; i < datum->n; ) {
452                 const struct ovsdb_row *row;
453
454                 row = ovsdb_table_get_row(column->type.key.u.uuid.refTable,
455                                           &datum->keys[i].uuid);
456                 if (row) {
457                     add_weak_ref(txn, txn_row->new, row);
458                     i++;
459                 } else {
460                     if (uuid_is_zero(&datum->keys[i].uuid)) {
461                         zero = true;
462                     }
463                     ovsdb_datum_remove_unsafe(datum, i, &column->type);
464                 }
465             }
466         }
467
468         if (ovsdb_base_type_is_weak_ref(&column->type.value)) {
469             for (i = 0; i < datum->n; ) {
470                 const struct ovsdb_row *row;
471
472                 row = ovsdb_table_get_row(column->type.value.u.uuid.refTable,
473                                           &datum->values[i].uuid);
474                 if (row) {
475                     add_weak_ref(txn, txn_row->new, row);
476                     i++;
477                 } else {
478                     if (uuid_is_zero(&datum->values[i].uuid)) {
479                         zero = true;
480                     }
481                     ovsdb_datum_remove_unsafe(datum, i, &column->type);
482                 }
483             }
484         }
485
486         if (datum->n != orig_n) {
487             bitmap_set1(txn_row->changed, column->index);
488             ovsdb_datum_sort_assert(datum, column->type.key.type);
489             if (datum->n < column->type.n_min) {
490                 const struct uuid *row_uuid = ovsdb_row_get_uuid(txn_row->new);
491                 if (zero && !txn_row->old) {
492                     return ovsdb_error(
493                         "constraint violation",
494                         "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
495                         " (inserted within this transaction) contained "
496                         "all-zeros UUID (probably as the default value for "
497                         "this column) but deleting this value caused a "
498                         "constraint volation because this column is not "
499                         "allowed to be empty.", column->name,
500                         table->schema->name, UUID_ARGS(row_uuid));
501                 } else {
502                     return ovsdb_error(
503                         "constraint violation",
504                         "Deletion of %u weak reference(s) to deleted (or "
505                         "never-existing) rows from column \"%s\" in \"%s\" "
506                         "row "UUID_FMT" %scaused this column to become empty, "
507                         "but constraints on this column disallow an "
508                         "empty column.",
509                         orig_n - datum->n, column->name, table->schema->name,
510                         UUID_ARGS(row_uuid),
511                         (txn_row->old
512                          ? ""
513                          : "(inserted within this transaction) "));
514                 }
515             }
516         }
517     }
518
519     return NULL;
520 }
521
522 static struct ovsdb_error * WARN_UNUSED_RESULT
523 determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
524 {
525     struct ovsdb_table *table = txn_row->table;
526
527     if (txn_row->old && txn_row->new) {
528         struct shash_node *node;
529         bool changed = false;
530
531         SHASH_FOR_EACH (node, &table->schema->columns) {
532             const struct ovsdb_column *column = node->data;
533             const struct ovsdb_type *type = &column->type;
534             unsigned int idx = column->index;
535
536             if (!ovsdb_datum_equals(&txn_row->old->fields[idx],
537                                     &txn_row->new->fields[idx],
538                                     type)) {
539                 bitmap_set1(txn_row->changed, idx);
540                 changed = true;
541             }
542         }
543
544         if (!changed) {
545             /* Nothing actually changed in this row, so drop it. */
546             ovsdb_txn_row_abort(txn, txn_row);
547         }
548     } else {
549         bitmap_set_multiple(txn_row->changed, 0,
550                             shash_count(&table->schema->columns), 1);
551     }
552
553     return NULL;
554 }
555
556 static struct ovsdb_error * WARN_UNUSED_RESULT
557 check_max_rows(struct ovsdb_txn *txn)
558 {
559     struct ovsdb_txn_table *t;
560
561     LIST_FOR_EACH (t, node, &txn->txn_tables) {
562         size_t n_rows = hmap_count(&t->table->rows);
563         unsigned int max_rows = t->table->schema->max_rows;
564
565         if (n_rows > max_rows) {
566             return ovsdb_error("constraint violation",
567                                "transaction causes \"%s\" table to contain "
568                                "%zu rows, greater than the schema-defined "
569                                "limit of %u row(s)",
570                                t->table->schema->name, n_rows, max_rows);
571         }
572     }
573
574     return NULL;
575 }
576
577 struct ovsdb_error *
578 ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
579 {
580     struct ovsdb_replica *replica;
581     struct ovsdb_error *error;
582
583     /* Figure out what actually changed, and abort early if the transaction
584      * was really a no-op. */
585     error = for_each_txn_row(txn, determine_changes);
586     if (error) {
587         return OVSDB_WRAP_BUG("can't happen", error);
588     }
589     if (list_is_empty(&txn->txn_tables)) {
590         ovsdb_txn_abort(txn);
591         return NULL;
592     }
593
594     /* Update reference counts and check referential integrity. */
595     error = update_ref_counts(txn);
596     if (error) {
597         ovsdb_txn_abort(txn);
598         return error;
599     }
600
601     /* Delete unreferenced, non-root rows. */
602     error = for_each_txn_row(txn, collect_garbage);
603     if (error) {
604         ovsdb_txn_abort(txn);
605         return OVSDB_WRAP_BUG("can't happen", error);
606     }
607
608     /* Check maximum rows table constraints. */
609     error = check_max_rows(txn);
610     if (error) {
611         ovsdb_txn_abort(txn);
612         return error;
613     }
614
615     /* Check reference counts and remove bad reference for "weak" referential
616      * integrity. */
617     error = for_each_txn_row(txn, assess_weak_refs);
618     if (error) {
619         ovsdb_txn_abort(txn);
620         return error;
621     }
622
623     /* Send the commit to each replica. */
624     LIST_FOR_EACH (replica, node, &txn->db->replicas) {
625         error = (replica->class->commit)(replica, txn, durable);
626         if (error) {
627             /* We don't support two-phase commit so only the first replica is
628              * allowed to report an error. */
629             assert(&replica->node == txn->db->replicas.next);
630
631             ovsdb_txn_abort(txn);
632             return error;
633         }
634     }
635
636     /* Finalize commit. */
637     txn->db->run_triggers = true;
638     ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_commit));
639     ovsdb_txn_free(txn);
640
641     return NULL;
642 }
643
644 void
645 ovsdb_txn_for_each_change(const struct ovsdb_txn *txn,
646                           ovsdb_txn_row_cb_func *cb, void *aux)
647 {
648     struct ovsdb_txn_table *t;
649     struct ovsdb_txn_row *r;
650
651     LIST_FOR_EACH (t, node, &txn->txn_tables) {
652         HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
653             if ((r->old || r->new) && !cb(r->old, r->new, r->changed, aux)) {
654                 break;
655             }
656         }
657    }
658 }
659
660 static struct ovsdb_txn_table *
661 ovsdb_txn_create_txn_table(struct ovsdb_txn *txn, struct ovsdb_table *table)
662 {
663     if (!table->txn_table) {
664         struct ovsdb_txn_table *txn_table;
665
666         table->txn_table = txn_table = xmalloc(sizeof *table->txn_table);
667         txn_table->table = table;
668         hmap_init(&txn_table->txn_rows);
669         txn_table->serial = serial - 1;
670         list_push_back(&txn->txn_tables, &txn_table->node);
671     }
672     return table->txn_table;
673 }
674
675 static struct ovsdb_txn_row *
676 ovsdb_txn_row_create(struct ovsdb_txn *txn, struct ovsdb_table *table,
677                      const struct ovsdb_row *old_, struct ovsdb_row *new)
678 {
679     const struct ovsdb_row *row = old_ ? old_ : new;
680     struct ovsdb_row *old = (struct ovsdb_row *) old_;
681     size_t n_columns = shash_count(&table->schema->columns);
682     struct ovsdb_txn_table *txn_table;
683     struct ovsdb_txn_row *txn_row;
684
685     txn_row = xzalloc(offsetof(struct ovsdb_txn_row, changed)
686                       + bitmap_n_bytes(n_columns));
687     txn_row->uuid = *ovsdb_row_get_uuid(row);
688     txn_row->table = row->table;
689     txn_row->old = old;
690     txn_row->new = new;
691     txn_row->n_refs = old ? old->n_refs : 0;
692     txn_row->serial = serial - 1;
693
694     if (old) {
695         old->txn_row = txn_row;
696     }
697     if (new) {
698         new->txn_row = txn_row;
699     }
700
701     txn_table = ovsdb_txn_create_txn_table(txn, table);
702     hmap_insert(&txn_table->txn_rows, &txn_row->hmap_node,
703                 ovsdb_row_hash(old ? old : new));
704
705     return txn_row;
706 }
707
708 struct ovsdb_row *
709 ovsdb_txn_row_modify(struct ovsdb_txn *txn, const struct ovsdb_row *ro_row_)
710 {
711     struct ovsdb_row *ro_row = (struct ovsdb_row *) ro_row_;
712
713     if (ro_row->txn_row) {
714         assert(ro_row == ro_row->txn_row->new);
715         return ro_row;
716     } else {
717         struct ovsdb_table *table = ro_row->table;
718         struct ovsdb_row *rw_row;
719
720         rw_row = ovsdb_row_clone(ro_row);
721         rw_row->n_refs = ro_row->n_refs;
722         uuid_generate(ovsdb_row_get_version_rw(rw_row));
723         ovsdb_txn_row_create(txn, table, ro_row, rw_row);
724         hmap_replace(&table->rows, &ro_row->hmap_node, &rw_row->hmap_node);
725
726         return rw_row;
727     }
728 }
729
730 void
731 ovsdb_txn_row_insert(struct ovsdb_txn *txn, struct ovsdb_row *row)
732 {
733     uint32_t hash = ovsdb_row_hash(row);
734     struct ovsdb_table *table = row->table;
735
736     uuid_generate(ovsdb_row_get_version_rw(row));
737
738     ovsdb_txn_row_create(txn, table, NULL, row);
739     hmap_insert(&table->rows, &row->hmap_node, hash);
740 }
741
742 /* 'row' must be assumed destroyed upon return; the caller must not reference
743  * it again. */
744 void
745 ovsdb_txn_row_delete(struct ovsdb_txn *txn, const struct ovsdb_row *row_)
746 {
747     struct ovsdb_row *row = (struct ovsdb_row *) row_;
748     struct ovsdb_table *table = row->table;
749     struct ovsdb_txn_row *txn_row = row->txn_row;
750
751     hmap_remove(&table->rows, &row->hmap_node);
752
753     if (!txn_row) {
754         ovsdb_txn_row_create(txn, table, row, NULL);
755     } else {
756         assert(txn_row->new == row);
757         if (txn_row->old) {
758             txn_row->new = NULL;
759         } else {
760             hmap_remove(&table->txn_table->txn_rows, &txn_row->hmap_node);
761             free(txn_row);
762         }
763         ovsdb_row_destroy(row);
764     }
765 }
766
767 void
768 ovsdb_txn_add_comment(struct ovsdb_txn *txn, const char *s)
769 {
770     if (txn->comment.length) {
771         ds_put_char(&txn->comment, '\n');
772     }
773     ds_put_cstr(&txn->comment, s);
774 }
775
776 const char *
777 ovsdb_txn_get_comment(const struct ovsdb_txn *txn)
778 {
779     return txn->comment.length ? ds_cstr_ro(&txn->comment) : NULL;
780 }
781 \f
782 static void
783 ovsdb_txn_row_prefree(struct ovsdb_txn_row *txn_row)
784 {
785     struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
786
787     txn_table->n_processed--;
788     hmap_remove(&txn_table->txn_rows, &txn_row->hmap_node);
789
790     if (txn_row->old) {
791         txn_row->old->txn_row = NULL;
792     }
793     if (txn_row->new) {
794         txn_row->new->txn_row = NULL;
795     }
796 }
797
798 static void
799 ovsdb_txn_table_destroy(struct ovsdb_txn_table *txn_table)
800 {
801     assert(hmap_is_empty(&txn_table->txn_rows));
802     txn_table->table->txn_table = NULL;
803     hmap_destroy(&txn_table->txn_rows);
804     list_remove(&txn_table->node);
805     free(txn_table);
806 }
807
808 /* Calls 'cb' for every txn_row within 'txn'.  If 'cb' returns nonnull, this
809  * aborts the iteration and for_each_txn_row() passes the error up.  Otherwise,
810  * returns a null pointer after iteration is complete.
811  *
812  * 'cb' may insert new txn_rows and new txn_tables into 'txn'.  It may delete
813  * the txn_row that it is passed in, or txn_rows in txn_tables other than the
814  * one passed to 'cb'.  It may *not* delete txn_rows other than the one passed
815  * in within the same txn_table.  It may *not* delete any txn_tables.  As long
816  * as these rules are followed, 'cb' will be called exactly once for each
817  * txn_row in 'txn', even those added by 'cb'.
818  *
819  * (Even though 'cb' is not allowed to delete some txn_rows, it can still
820  * delete any actual row by clearing a txn_row's 'new' member.)
821  */
822 static struct ovsdb_error * WARN_UNUSED_RESULT
823 for_each_txn_row(struct ovsdb_txn *txn,
824                  struct ovsdb_error *(*cb)(struct ovsdb_txn *,
825                                            struct ovsdb_txn_row *))
826 {
827     bool any_work;
828
829     serial++;
830
831     do {
832         struct ovsdb_txn_table *t, *next_txn_table;
833
834         any_work = false;
835         LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
836             if (t->serial != serial) {
837                 t->serial = serial;
838                 t->n_processed = 0;
839             }
840
841             while (t->n_processed < hmap_count(&t->txn_rows)) {
842                 struct ovsdb_txn_row *r, *next_txn_row;
843
844                 HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
845                     if (r->serial != serial) {
846                         struct ovsdb_error *error;
847
848                         r->serial = serial;
849                         t->n_processed++;
850                         any_work = true;
851
852                         error = cb(txn, r);
853                         if (error) {
854                             return error;
855                         }
856                     }
857                 }
858             }
859             if (hmap_is_empty(&t->txn_rows)) {
860                 /* Table is empty.  Drop it. */
861                 ovsdb_txn_table_destroy(t);
862             }
863         }
864     } while (any_work);
865
866     return NULL;
867 }