idl: Gracefully handle destroying a transaction before receiving its reply.
[sliver-openvswitch.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 #include <stdlib.h>
25
26 #include "bitmap.h"
27 #include "dynamic-string.h"
28 #include "json.h"
29 #include "jsonrpc.h"
30 #include "ovsdb-data.h"
31 #include "ovsdb-error.h"
32 #include "ovsdb-idl-provider.h"
33 #include "poll-loop.h"
34 #include "shash.h"
35 #include "util.h"
36
37 #define THIS_MODULE VLM_ovsdb_idl
38 #include "vlog.h"
39
40 /* An arc from one idl_row to another.  When row A contains a UUID that
41  * references row B, this is represented by an arc from A (the source) to B
42  * (the destination).
43  *
44  * Arcs from a row to itself are omitted, that is, src and dst are always
45  * different.
46  *
47  * Arcs are never duplicated, that is, even if there are multiple references
48  * from A to B, there is only a single arc from A to B.
49  *
50  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
51  * A.  Both an arc and its converse may both be present, if each row refers
52  * to the other circularly.
53  *
54  * The source and destination row may be in the same table or in different
55  * tables.
56  */
57 struct ovsdb_idl_arc {
58     struct list src_node;       /* In src->src_arcs list. */
59     struct list dst_node;       /* In dst->dst_arcs list. */
60     struct ovsdb_idl_row *src;  /* Source row. */
61     struct ovsdb_idl_row *dst;  /* Destination row. */
62 };
63
64 struct ovsdb_idl {
65     const struct ovsdb_idl_class *class;
66     struct jsonrpc_session *session;
67     struct shash table_by_name;
68     struct ovsdb_idl_table *tables;
69     struct json *monitor_request_id;
70     unsigned int last_monitor_request_seqno;
71     unsigned int change_seqno;
72
73     /* Transaction support. */
74     struct ovsdb_idl_txn *txn;
75     struct hmap outstanding_txns;
76 };
77
78 struct ovsdb_idl_txn {
79     struct hmap_node hmap_node;
80     struct json *request_id;
81     struct ovsdb_idl *idl;
82     struct hmap txn_rows;
83     enum ovsdb_idl_txn_status status;
84     bool dry_run;
85     struct ds comment;
86
87     /* Increments. */
88     char *inc_table;
89     char *inc_column;
90     struct json *inc_where;
91     unsigned int inc_index;
92     int64_t inc_new_value;
93 };
94
95 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
96 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
97
98 static void ovsdb_idl_clear(struct ovsdb_idl *);
99 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
100 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
101 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
102                                                     const struct json *);
103 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
104                                      const struct uuid *,
105                                      const struct json *old,
106                                      const struct json *new);
107 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
108 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
109 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
110
111 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
112 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
113     const struct ovsdb_idl_table_class *);
114 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
115                                                   const struct uuid *);
116 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
117
118 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
119 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
120
121 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
122 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
123                                         const struct jsonrpc_msg *msg);
124
125 struct ovsdb_idl *
126 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
127 {
128     struct ovsdb_idl *idl;
129     size_t i;
130
131     idl = xzalloc(sizeof *idl);
132     idl->class = class;
133     idl->session = jsonrpc_session_open(remote);
134     shash_init(&idl->table_by_name);
135     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
136     for (i = 0; i < class->n_tables; i++) {
137         const struct ovsdb_idl_table_class *tc = &class->tables[i];
138         struct ovsdb_idl_table *table = &idl->tables[i];
139         size_t j;
140
141         assert(!shash_find(&idl->table_by_name, tc->name));
142         shash_add(&idl->table_by_name, tc->name, table);
143         table->class = tc;
144         shash_init(&table->columns);
145         for (j = 0; j < tc->n_columns; j++) {
146             const struct ovsdb_idl_column *column = &tc->columns[j];
147
148             assert(!shash_find(&table->columns, column->name));
149             shash_add(&table->columns, column->name, column);
150         }
151         hmap_init(&table->rows);
152         table->idl = idl;
153     }
154     idl->last_monitor_request_seqno = UINT_MAX;
155     hmap_init(&idl->outstanding_txns);
156
157     return idl;
158 }
159
160 void
161 ovsdb_idl_destroy(struct ovsdb_idl *idl)
162 {
163     if (idl) {
164         size_t i;
165
166         assert(!idl->txn);
167         ovsdb_idl_clear(idl);
168         jsonrpc_session_close(idl->session);
169
170         for (i = 0; i < idl->class->n_tables; i++) {
171             struct ovsdb_idl_table *table = &idl->tables[i];
172             shash_destroy(&table->columns);
173             hmap_destroy(&table->rows);
174         }
175         shash_destroy(&idl->table_by_name);
176         free(idl->tables);
177         json_destroy(idl->monitor_request_id);
178         free(idl);
179     }
180 }
181
182 static void
183 ovsdb_idl_clear(struct ovsdb_idl *idl)
184 {
185     bool changed = false;
186     size_t i;
187
188     for (i = 0; i < idl->class->n_tables; i++) {
189         struct ovsdb_idl_table *table = &idl->tables[i];
190         struct ovsdb_idl_row *row, *next_row;
191
192         if (hmap_is_empty(&table->rows)) {
193             continue;
194         }
195
196         changed = true;
197         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
198                             &table->rows) {
199             struct ovsdb_idl_arc *arc, *next_arc;
200
201             if (!ovsdb_idl_row_is_orphan(row)) {
202                 (row->table->class->unparse)(row);
203                 ovsdb_idl_row_clear_old(row);
204             }
205             hmap_remove(&table->rows, &row->hmap_node);
206             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
207                                 &row->src_arcs) {
208                 free(arc);
209             }
210             /* No need to do anything with dst_arcs: some node has those arcs
211              * as forward arcs and will destroy them itself. */
212
213             free(row);
214         }
215     }
216
217     if (changed) {
218         idl->change_seqno++;
219     }
220 }
221
222 void
223 ovsdb_idl_run(struct ovsdb_idl *idl)
224 {
225     int i;
226
227     assert(!idl->txn);
228     jsonrpc_session_run(idl->session);
229     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
230         struct jsonrpc_msg *msg, *reply;
231         unsigned int seqno;
232
233         seqno = jsonrpc_session_get_seqno(idl->session);
234         if (idl->last_monitor_request_seqno != seqno) {
235             idl->last_monitor_request_seqno = seqno;
236             ovsdb_idl_txn_abort_all(idl);
237             ovsdb_idl_send_monitor_request(idl);
238             break;
239         }
240
241         msg = jsonrpc_session_recv(idl->session);
242         if (!msg) {
243             break;
244         }
245
246         reply = NULL;
247         if (msg->type == JSONRPC_NOTIFY
248                    && !strcmp(msg->method, "update")
249                    && msg->params->type == JSON_ARRAY
250                    && msg->params->u.array.n == 2
251                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
252             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
253         } else if (msg->type == JSONRPC_REPLY
254                    && idl->monitor_request_id
255                    && json_equal(idl->monitor_request_id, msg->id)) {
256             json_destroy(idl->monitor_request_id);
257             idl->monitor_request_id = NULL;
258             ovsdb_idl_clear(idl);
259             ovsdb_idl_parse_update(idl, msg->result);
260         } else if (msg->type == JSONRPC_REPLY
261                    && msg->id && msg->id->type == JSON_STRING
262                    && !strcmp(msg->id->u.string, "echo")) {
263             /* It's a reply to our echo request.  Ignore it. */
264         } else if ((msg->type == JSONRPC_ERROR
265                     || msg->type == JSONRPC_REPLY)
266                    && ovsdb_idl_txn_process_reply(idl, msg)) {
267             /* ovsdb_idl_txn_process_reply() did everything needful. */
268         } else {
269             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
270              * a transaction before we receive the reply, so keep the log level
271              * low. */
272             VLOG_DBG("%s: received unexpected %s message",
273                      jsonrpc_session_get_name(idl->session),
274                      jsonrpc_msg_type_to_string(msg->type));
275         }
276         if (reply) {
277             jsonrpc_session_send(idl->session, reply);
278         }
279         jsonrpc_msg_destroy(msg);
280     }
281 }
282
283 void
284 ovsdb_idl_wait(struct ovsdb_idl *idl)
285 {
286     jsonrpc_session_wait(idl->session);
287     jsonrpc_session_recv_wait(idl->session);
288 }
289
290 unsigned int
291 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
292 {
293     return idl->change_seqno;
294 }
295
296 bool
297 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
298 {
299     return ovsdb_idl_get_seqno(idl) != 0;
300 }
301
302 void
303 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
304 {
305     jsonrpc_session_force_reconnect(idl->session);
306 }
307 \f
308 static void
309 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
310 {
311     struct json *monitor_requests;
312     struct jsonrpc_msg *msg;
313     size_t i;
314
315     monitor_requests = json_object_create();
316     for (i = 0; i < idl->class->n_tables; i++) {
317         const struct ovsdb_idl_table *table = &idl->tables[i];
318         const struct ovsdb_idl_table_class *tc = table->class;
319         struct json *monitor_request, *columns;
320         size_t i;
321
322         monitor_request = json_object_create();
323         columns = json_array_create_empty();
324         for (i = 0; i < tc->n_columns; i++) {
325             const struct ovsdb_idl_column *column = &tc->columns[i];
326             json_array_add(columns, json_string_create(column->name));
327         }
328         json_object_put(monitor_request, "columns", columns);
329         json_object_put(monitor_requests, tc->name, monitor_request);
330     }
331
332     json_destroy(idl->monitor_request_id);
333     msg = jsonrpc_create_request(
334         "monitor", json_array_create_2(json_null_create(), monitor_requests),
335         &idl->monitor_request_id);
336     jsonrpc_session_send(idl->session, msg);
337 }
338
339 static void
340 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
341 {
342     struct ovsdb_error *error;
343
344     idl->change_seqno++;
345
346     error = ovsdb_idl_parse_update__(idl, table_updates);
347     if (error) {
348         if (!VLOG_DROP_WARN(&syntax_rl)) {
349             char *s = ovsdb_error_to_string(error);
350             VLOG_WARN_RL(&syntax_rl, "%s", s);
351             free(s);
352         }
353         ovsdb_error_destroy(error);
354     }
355 }
356
357 static struct ovsdb_error *
358 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
359                          const struct json *table_updates)
360 {
361     const struct shash_node *tables_node;
362
363     if (table_updates->type != JSON_OBJECT) {
364         return ovsdb_syntax_error(table_updates, NULL,
365                                   "<table-updates> is not an object");
366     }
367     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
368         const struct json *table_update = tables_node->data;
369         const struct shash_node *table_node;
370         struct ovsdb_idl_table *table;
371
372         table = shash_find_data(&idl->table_by_name, tables_node->name);
373         if (!table) {
374             return ovsdb_syntax_error(
375                 table_updates, NULL,
376                 "<table-updates> includes unknown table \"%s\"",
377                 tables_node->name);
378         }
379
380         if (table_update->type != JSON_OBJECT) {
381             return ovsdb_syntax_error(table_update, NULL,
382                                       "<table-update> for table \"%s\" is "
383                                       "not an object", table->class->name);
384         }
385         SHASH_FOR_EACH (table_node, json_object(table_update)) {
386             const struct json *row_update = table_node->data;
387             const struct json *old_json, *new_json;
388             struct uuid uuid;
389
390             if (!uuid_from_string(&uuid, table_node->name)) {
391                 return ovsdb_syntax_error(table_update, NULL,
392                                           "<table-update> for table \"%s\" "
393                                           "contains bad UUID "
394                                           "\"%s\" as member name",
395                                           table->class->name,
396                                           table_node->name);
397             }
398             if (row_update->type != JSON_OBJECT) {
399                 return ovsdb_syntax_error(row_update, NULL,
400                                           "<table-update> for table \"%s\" "
401                                           "contains <row-update> for %s that "
402                                           "is not an object",
403                                           table->class->name,
404                                           table_node->name);
405             }
406
407             old_json = shash_find_data(json_object(row_update), "old");
408             new_json = shash_find_data(json_object(row_update), "new");
409             if (old_json && old_json->type != JSON_OBJECT) {
410                 return ovsdb_syntax_error(old_json, NULL,
411                                           "\"old\" <row> is not object");
412             } else if (new_json && new_json->type != JSON_OBJECT) {
413                 return ovsdb_syntax_error(new_json, NULL,
414                                           "\"new\" <row> is not object");
415             } else if ((old_json != NULL) + (new_json != NULL)
416                        != shash_count(json_object(row_update))) {
417                 return ovsdb_syntax_error(row_update, NULL,
418                                           "<row-update> contains unexpected "
419                                           "member");
420             } else if (!old_json && !new_json) {
421                 return ovsdb_syntax_error(row_update, NULL,
422                                           "<row-update> missing \"old\" "
423                                           "and \"new\" members");
424             }
425
426             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
427         }
428     }
429
430     return NULL;
431 }
432
433 static struct ovsdb_idl_row *
434 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
435 {
436     struct ovsdb_idl_row *row;
437
438     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
439                              uuid_hash(uuid), &table->rows) {
440         if (uuid_equals(&row->uuid, uuid)) {
441             return row;
442         }
443     }
444     return NULL;
445 }
446
447 static void
448 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
449                          const struct uuid *uuid, const struct json *old,
450                          const struct json *new)
451 {
452     struct ovsdb_idl_row *row;
453
454     row = ovsdb_idl_get_row(table, uuid);
455     if (!new) {
456         /* Delete row. */
457         if (row && !ovsdb_idl_row_is_orphan(row)) {
458             /* XXX perhaps we should check the 'old' values? */
459             ovsdb_idl_delete_row(row);
460         } else {
461             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
462                          "from table %s",
463                          UUID_ARGS(uuid), table->class->name);
464         }
465     } else if (!old) {
466         /* Insert row. */
467         if (!row) {
468             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
469         } else if (ovsdb_idl_row_is_orphan(row)) {
470             ovsdb_idl_insert_row(row, new);
471         } else {
472             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
473                          "table %s", UUID_ARGS(uuid), table->class->name);
474             ovsdb_idl_modify_row(row, new);
475         }
476     } else {
477         /* Modify row. */
478         if (row) {
479             /* XXX perhaps we should check the 'old' values? */
480             if (!ovsdb_idl_row_is_orphan(row)) {
481                 ovsdb_idl_modify_row(row, new);
482             } else {
483                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
484                              "referenced row "UUID_FMT" in table %s",
485                              UUID_ARGS(uuid), table->class->name);
486                 ovsdb_idl_insert_row(row, new);
487             }
488         } else {
489             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
490                          "in table %s", UUID_ARGS(uuid), table->class->name);
491             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
492         }
493     }
494 }
495
496 static void
497 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
498 {
499     struct ovsdb_idl_table *table = row->table;
500     struct shash_node *node;
501
502     SHASH_FOR_EACH (node, json_object(row_json)) {
503         const char *column_name = node->name;
504         const struct ovsdb_idl_column *column;
505         struct ovsdb_datum datum;
506         struct ovsdb_error *error;
507
508         column = shash_find_data(&table->columns, column_name);
509         if (!column) {
510             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
511                          column_name, UUID_ARGS(&row->uuid));
512             continue;
513         }
514
515         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
516         if (!error) {
517             ovsdb_datum_swap(&row->old[column - table->class->columns],
518                              &datum);
519             ovsdb_datum_destroy(&datum, &column->type);
520         } else {
521             char *s = ovsdb_error_to_string(error);
522             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
523                          " in table %s: %s", column_name,
524                          UUID_ARGS(&row->uuid), table->class->name, s);
525             free(s);
526             ovsdb_error_destroy(error);
527         }
528     }
529 }
530
531 static bool
532 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
533 {
534     return !row->old;
535 }
536
537 static void
538 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
539 {
540     assert(row->old == row->new);
541     if (!ovsdb_idl_row_is_orphan(row)) {
542         const struct ovsdb_idl_table_class *class = row->table->class;
543         size_t i;
544
545         for (i = 0; i < class->n_columns; i++) {
546             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
547         }
548         free(row->old);
549         row->old = row->new = NULL;
550     }
551 }
552
553 static void
554 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
555 {
556     if (row->old != row->new) {
557         if (row->new) {
558             const struct ovsdb_idl_table_class *class = row->table->class;
559             size_t i;
560
561             BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
562                 ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
563             }
564             free(row->new);
565             free(row->written);
566             row->written = NULL;
567         }
568         row->new = row->old;
569     }
570 }
571
572 static void
573 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
574 {
575     struct ovsdb_idl_arc *arc, *next;
576
577     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
578      * that this causes to be unreferenced. */
579     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
580                         &row->src_arcs) {
581         list_remove(&arc->dst_node);
582         if (destroy_dsts
583             && ovsdb_idl_row_is_orphan(arc->dst)
584             && list_is_empty(&arc->dst->dst_arcs)) {
585             ovsdb_idl_row_destroy(arc->dst);
586         }
587         free(arc);
588     }
589     list_init(&row->src_arcs);
590 }
591
592 /* Force nodes that reference 'row' to reparse. */
593 static void
594 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row, bool destroy_dsts)
595 {
596     struct ovsdb_idl_arc *arc, *next;
597
598     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
599      * 'arc', so we need to use the "safe" variant of list traversal.  However,
600      * calling ref->table->class->parse will add an arc equivalent to 'arc' to
601      * row->arcs.  That could be a problem for traversal, but it adds it at the
602      * beginning of the list to prevent us from stumbling upon it again.
603      *
604      * (If duplicate arcs were possible then we would need to make sure that
605      * 'next' didn't also point into 'arc''s destination, but we forbid
606      * duplicate arcs.) */
607     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
608                         &row->dst_arcs) {
609         struct ovsdb_idl_row *ref = arc->src;
610
611         (ref->table->class->unparse)(ref);
612         ovsdb_idl_row_clear_arcs(ref, destroy_dsts);
613         (ref->table->class->parse)(ref);
614     }
615 }
616
617 static struct ovsdb_idl_row *
618 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
619 {
620     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
621     memset(row, 0, sizeof *row);
622     list_init(&row->src_arcs);
623     list_init(&row->dst_arcs);
624     hmap_node_nullify(&row->txn_node);
625     return row;
626 }
627
628 static struct ovsdb_idl_row *
629 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
630 {
631     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
632     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
633     row->uuid = *uuid;
634     row->table = table;
635     return row;
636 }
637
638 static void
639 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
640 {
641     if (row) {
642         ovsdb_idl_row_clear_old(row);
643         hmap_remove(&row->table->rows, &row->hmap_node);
644         free(row);
645     }
646 }
647
648 static void
649 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
650 {
651     const struct ovsdb_idl_table_class *class = row->table->class;
652     size_t i;
653
654     assert(!row->old && !row->new);
655     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
656     for (i = 0; i < class->n_columns; i++) {
657         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
658     }
659     ovsdb_idl_row_update(row, row_json);
660     (class->parse)(row);
661
662     ovsdb_idl_row_reparse_backrefs(row, false);
663 }
664
665 static void
666 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
667 {
668     (row->table->class->unparse)(row);
669     ovsdb_idl_row_clear_arcs(row, true);
670     ovsdb_idl_row_clear_old(row);
671     if (list_is_empty(&row->dst_arcs)) {
672         ovsdb_idl_row_destroy(row);
673     } else {
674         ovsdb_idl_row_reparse_backrefs(row, true);
675     }
676 }
677
678 static void
679 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
680 {
681     (row->table->class->unparse)(row);
682     ovsdb_idl_row_clear_arcs(row, true);
683     ovsdb_idl_row_update(row, row_json);
684     (row->table->class->parse)(row);
685 }
686
687 static bool
688 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
689 {
690     const struct ovsdb_idl_arc *arc;
691
692     /* No self-arcs. */
693     if (src == dst) {
694         return false;
695     }
696
697     /* No duplicate arcs.
698      *
699      * We only need to test whether the first arc in dst->dst_arcs originates
700      * at 'src', since we add all of the arcs from a given source in a clump
701      * (in a single call to a row's ->parse function) and new arcs are always
702      * added at the front of the dst_arcs list. */
703     if (list_is_empty(&dst->dst_arcs)) {
704         return true;
705     }
706     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
707     return arc->src != src;
708 }
709
710 static struct ovsdb_idl_table *
711 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
712                            const struct ovsdb_idl_table_class *table_class)
713 {
714     return &idl->tables[table_class - idl->class->tables];
715 }
716
717 struct ovsdb_idl_row *
718 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
719                       struct ovsdb_idl_table_class *dst_table_class,
720                       const struct uuid *dst_uuid)
721 {
722     struct ovsdb_idl *idl = src->table->idl;
723     struct ovsdb_idl_table *dst_table;
724     struct ovsdb_idl_arc *arc;
725     struct ovsdb_idl_row *dst;
726
727     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
728     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
729     if (!dst) {
730         dst = ovsdb_idl_row_create(dst_table, dst_uuid);
731     }
732
733     /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
734     if (may_add_arc(src, dst)) {
735         /* The arc *must* be added at the front of the dst_arcs list.  See
736          * ovsdb_idl_row_reparse_backrefs() for details. */
737         arc = xmalloc(sizeof *arc);
738         list_push_front(&src->src_arcs, &arc->src_node);
739         list_push_front(&dst->dst_arcs, &arc->dst_node);
740         arc->src = src;
741         arc->dst = dst;
742     }
743
744     return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
745 }
746
747 static struct ovsdb_idl_row *
748 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
749 {
750     for (; node; node = hmap_next(&table->rows, node)) {
751         struct ovsdb_idl_row *row;
752
753         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
754         if (!ovsdb_idl_row_is_orphan(row)) {
755             return row;
756         }
757     }
758     return NULL;
759 }
760
761 struct ovsdb_idl_row *
762 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
763                     const struct ovsdb_idl_table_class *table_class)
764 {
765     struct ovsdb_idl_table *table
766         = ovsdb_idl_table_from_class(idl, table_class);
767     return next_real_row(table, hmap_first(&table->rows));
768 }
769
770 struct ovsdb_idl_row *
771 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
772 {
773     struct ovsdb_idl_table *table = row->table;
774
775     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
776 }
777 \f
778 /* Transactions. */
779
780 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
781                                    enum ovsdb_idl_txn_status);
782
783 const char *
784 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
785 {
786     switch (status) {
787     case TXN_UNCHANGED:
788         return "unchanged";
789     case TXN_INCOMPLETE:
790         return "incomplete";
791     case TXN_ABORTED:
792         return "aborted";
793     case TXN_SUCCESS:
794         return "success";
795     case TXN_TRY_AGAIN:
796         return "try again";
797     case TXN_ERROR:
798         return "error";
799     }
800     return "<unknown>";
801 }
802
803 struct ovsdb_idl_txn *
804 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
805 {
806     struct ovsdb_idl_txn *txn;
807
808     assert(!idl->txn);
809     idl->txn = txn = xmalloc(sizeof *txn);
810     txn->idl = idl;
811     txn->status = TXN_INCOMPLETE;
812     hmap_init(&txn->txn_rows);
813     txn->dry_run = false;
814     ds_init(&txn->comment);
815     txn->inc_table = NULL;
816     txn->inc_column = NULL;
817     txn->inc_where = NULL;
818     return txn;
819 }
820
821 void
822 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s)
823 {
824     if (txn->comment.length) {
825         ds_put_char(&txn->comment, '\n');
826     }
827     ds_put_cstr(&txn->comment, s);
828 }
829
830 void
831 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
832 {
833     txn->dry_run = true;
834 }
835
836 void
837 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn, const char *table,
838                         const char *column, const struct json *where)
839 {
840     assert(!txn->inc_table);
841     txn->inc_table = xstrdup(table);
842     txn->inc_column = xstrdup(column);
843     txn->inc_where = where ? json_clone(where) : json_array_create_empty();
844 }
845
846 void
847 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
848 {
849     if (txn->status == TXN_INCOMPLETE) {
850         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
851     }
852     ovsdb_idl_txn_abort(txn);
853     ds_destroy(&txn->comment);
854     free(txn->inc_table);
855     free(txn->inc_column);
856     json_destroy(txn->inc_where);
857     free(txn);
858 }
859
860 void
861 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
862 {
863     if (txn->status != TXN_INCOMPLETE) {
864         poll_immediate_wake();
865     }
866 }
867
868 static struct json *
869 where_uuid_equals(const struct uuid *uuid)
870 {
871     return
872         json_array_create_1(
873             json_array_create_3(
874                 json_string_create("_uuid"),
875                 json_string_create("=="),
876                 json_array_create_2(
877                     json_string_create("uuid"),
878                     json_string_create_nocopy(
879                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
880 }
881
882 static char *
883 uuid_name_from_uuid(const struct uuid *uuid)
884 {
885     char *name;
886     char *p;
887
888     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
889     for (p = name; *p != '\0'; p++) {
890         if (*p == '-') {
891             *p = '_';
892         }
893     }
894
895     return name;
896 }
897
898 static const struct ovsdb_idl_row *
899 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
900 {
901     const struct ovsdb_idl_row *row;
902
903     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, txn_node,
904                              uuid_hash(uuid), &txn->txn_rows) {
905         if (uuid_equals(&row->uuid, uuid)) {
906             return row;
907         }
908     }
909     return NULL;
910 }
911
912 /* XXX there must be a cleaner way to do this */
913 static struct json *
914 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
915 {
916     if (json->type == JSON_ARRAY) {
917         struct uuid uuid;
918         size_t i;
919
920         if (json->u.array.n == 2
921             && json->u.array.elems[0]->type == JSON_STRING
922             && json->u.array.elems[1]->type == JSON_STRING
923             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
924             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
925             const struct ovsdb_idl_row *row;
926
927             row = ovsdb_idl_txn_get_row(txn, &uuid);
928             if (row && !row->old && row->new) {
929                 json_destroy(json);
930
931                 return json_array_create_2(
932                     json_string_create("named-uuid"),
933                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
934             }
935         }
936
937         for (i = 0; i < json->u.array.n; i++) {
938             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
939                                                       txn);
940         }
941     } else if (json->type == JSON_OBJECT) {
942         struct shash_node *node;
943
944         SHASH_FOR_EACH (node, json_object(json)) {
945             node->data = substitute_uuids(node->data, txn);
946         }
947     }
948     return json;
949 }
950
951 static void
952 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
953 {
954     struct ovsdb_idl_row *row, *next;
955
956     HMAP_FOR_EACH_SAFE (row, next, struct ovsdb_idl_row, txn_node,
957                         &txn->txn_rows) {
958         if (row->old && row->written) {
959             (row->table->class->unparse)(row);
960             ovsdb_idl_row_clear_arcs(row, false);
961             (row->table->class->parse)(row);
962         }
963         ovsdb_idl_row_clear_new(row);
964
965         free(row->prereqs);
966         row->prereqs = NULL;
967
968         free(row->written);
969         row->written = NULL;
970
971         hmap_remove(&txn->txn_rows, &row->txn_node);
972         hmap_node_nullify(&row->txn_node);
973     }
974     hmap_destroy(&txn->txn_rows);
975     hmap_init(&txn->txn_rows);
976 }
977
978 enum ovsdb_idl_txn_status
979 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
980 {
981     struct ovsdb_idl_row *row;
982     struct json *operations;
983     bool any_updates;
984
985     if (txn != txn->idl->txn) {
986         return txn->status;
987     }
988
989     operations = json_array_create_empty();
990
991     /* Add prerequisites and declarations of new rows. */
992     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
993         /* XXX check that deleted rows exist even if no prereqs? */
994         if (row->prereqs) {
995             const struct ovsdb_idl_table_class *class = row->table->class;
996             size_t n_columns = class->n_columns;
997             struct json *op, *columns, *row_json;
998             size_t idx;
999
1000             op = json_object_create();
1001             json_array_add(operations, op);
1002             json_object_put_string(op, "op", "wait");
1003             json_object_put_string(op, "table", class->name);
1004             json_object_put(op, "timeout", json_integer_create(0));
1005             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1006             json_object_put_string(op, "until", "==");
1007             columns = json_array_create_empty();
1008             json_object_put(op, "columns", columns);
1009             row_json = json_object_create();
1010             json_object_put(op, "rows", json_array_create_1(row_json));
1011
1012             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
1013                 const struct ovsdb_idl_column *column = &class->columns[idx];
1014                 json_array_add(columns, json_string_create(column->name));
1015                 json_object_put(row_json, column->name,
1016                                 ovsdb_datum_to_json(&row->old[idx],
1017                                                     &column->type));
1018             }
1019         }
1020         if (row->new && !row->old) {
1021             struct json *op;
1022
1023             op = json_object_create();
1024             json_array_add(operations, op);
1025             json_object_put_string(op, "op", "declare");
1026             json_object_put(op, "uuid-name",
1027                             json_string_create_nocopy(
1028                                 uuid_name_from_uuid(&row->uuid)));
1029         }
1030     }
1031
1032     /* Add updates. */
1033     any_updates = false;
1034     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
1035         const struct ovsdb_idl_table_class *class = row->table->class;
1036
1037         if (row->old == row->new) {
1038             continue;
1039         } else if (!row->new) {
1040             struct json *op = json_object_create();
1041             json_object_put_string(op, "op", "delete");
1042             json_object_put_string(op, "table", class->name);
1043             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1044             json_array_add(operations, op);
1045             any_updates = true;
1046         } else {
1047             struct json *row_json;
1048             struct json *op;
1049             size_t idx;
1050
1051             op = json_object_create();
1052             json_object_put_string(op, "op", row->old ? "update" : "insert");
1053             json_object_put_string(op, "table", class->name);
1054             if (row->old) {
1055                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1056             } else {
1057                 json_object_put(op, "uuid-name",
1058                                 json_string_create_nocopy(
1059                                     uuid_name_from_uuid(&row->uuid)));
1060             }
1061             row_json = json_object_create();
1062             json_object_put(op, "row", row_json);
1063
1064             BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
1065                 const struct ovsdb_idl_column *column = &class->columns[idx];
1066
1067                 if (!row->old || !ovsdb_datum_equals(&row->old[idx],
1068                                                      &row->new[idx],
1069                                                      &column->type)) {
1070                     json_object_put(row_json, column->name,
1071                                     substitute_uuids(
1072                                         ovsdb_datum_to_json(&row->new[idx],
1073                                                             &column->type),
1074                                         txn));
1075                 }
1076             }
1077
1078             if (!row->old || !shash_is_empty(json_object(row_json))) {
1079                 json_array_add(operations, op);
1080                 any_updates = true;
1081             } else {
1082                 json_destroy(op);
1083             }
1084         }
1085     }
1086
1087     /* Add increment. */
1088     if (txn->inc_table && any_updates) {
1089         struct json *op;
1090
1091         txn->inc_index = operations->u.array.n;
1092
1093         op = json_object_create();
1094         json_object_put_string(op, "op", "mutate");
1095         json_object_put_string(op, "table", txn->inc_table);
1096         json_object_put(op, "where",
1097                         substitute_uuids(json_clone(txn->inc_where), txn));
1098         json_object_put(op, "mutations",
1099                         json_array_create_1(
1100                             json_array_create_3(
1101                                 json_string_create(txn->inc_column),
1102                                 json_string_create("+="),
1103                                 json_integer_create(1))));
1104         json_array_add(operations, op);
1105
1106         op = json_object_create();
1107         json_object_put_string(op, "op", "select");
1108         json_object_put_string(op, "table", txn->inc_table);
1109         json_object_put(op, "where",
1110                         substitute_uuids(json_clone(txn->inc_where), txn));
1111         json_object_put(op, "columns",
1112                         json_array_create_1(json_string_create(
1113                                                 txn->inc_column)));
1114         json_array_add(operations, op);
1115     }
1116
1117     if (txn->comment.length) {
1118         struct json *op = json_object_create();
1119         json_object_put_string(op, "op", "comment");
1120         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
1121         json_array_add(operations, op);
1122     }
1123
1124     if (txn->dry_run) {
1125         struct json *op = json_object_create();
1126         json_object_put_string(op, "op", "abort");
1127         json_array_add(operations, op);
1128     }
1129
1130     if (!any_updates) {
1131         txn->status = TXN_UNCHANGED;
1132         json_destroy(operations);
1133     } else if (!jsonrpc_session_send(
1134                    txn->idl->session,
1135                    jsonrpc_create_request(
1136                        "transact", operations, &txn->request_id))) {
1137         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1138                     json_hash(txn->request_id, 0));
1139     } else {
1140         txn->status = TXN_INCOMPLETE;
1141     }
1142
1143     txn->idl->txn = NULL;
1144     ovsdb_idl_txn_disassemble(txn);
1145     return txn->status;
1146 }
1147
1148 int64_t
1149 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
1150 {
1151     assert(txn->status == TXN_SUCCESS);
1152     return txn->inc_new_value;
1153 }
1154
1155 void
1156 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1157 {
1158     ovsdb_idl_txn_disassemble(txn);
1159     if (txn->status == TXN_INCOMPLETE) {
1160         txn->status = TXN_ABORTED;
1161     }
1162 }
1163
1164 static void
1165 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1166                        enum ovsdb_idl_txn_status status)
1167 {
1168     txn->status = status;
1169     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1170 }
1171
1172 void
1173 ovsdb_idl_txn_write(struct ovsdb_idl_row *row,
1174                     const struct ovsdb_idl_column *column,
1175                     struct ovsdb_datum *datum)
1176 {
1177     const struct ovsdb_idl_table_class *class = row->table->class;
1178     size_t column_idx = column - class->columns;
1179
1180     assert(row->new);
1181     if (hmap_node_is_null(&row->txn_node)) {
1182         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1183                     uuid_hash(&row->uuid));
1184     }
1185     if (row->old == row->new) {
1186         row->new = xmalloc(class->n_columns * sizeof *row->new);
1187     }
1188     if (!row->written) {
1189         row->written = bitmap_allocate(class->n_columns);
1190     }
1191     if (bitmap_is_set(row->written, column_idx)) {
1192         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1193     } else {
1194         bitmap_set1(row->written, column_idx);
1195     }
1196     row->new[column_idx] = *datum;
1197 }
1198
1199 void
1200 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1201                      const struct ovsdb_idl_column *column)
1202 {
1203     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1204     const struct ovsdb_idl_table_class *class = row->table->class;
1205     size_t column_idx = column - class->columns;
1206
1207     assert(row->new);
1208     if (!row->old
1209         || (row->written && bitmap_is_set(row->written, column_idx))) {
1210         return;
1211     }
1212
1213     if (hmap_node_is_null(&row->txn_node)) {
1214         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1215                     uuid_hash(&row->uuid));
1216     }
1217     if (!row->prereqs) {
1218         row->prereqs = bitmap_allocate(class->n_columns);
1219     }
1220     bitmap_set1(row->prereqs, column_idx);
1221 }
1222
1223 void
1224 ovsdb_idl_txn_delete(struct ovsdb_idl_row *row)
1225 {
1226     assert(row->new);
1227     if (!row->old) {
1228         ovsdb_idl_row_clear_new(row);
1229         assert(!row->prereqs);
1230         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
1231         free(row);
1232     }
1233     if (hmap_node_is_null(&row->txn_node)) {
1234         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1235                     uuid_hash(&row->uuid));
1236     }
1237     ovsdb_idl_row_clear_new(row);
1238     row->new = NULL;
1239 }
1240
1241 struct ovsdb_idl_row *
1242 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
1243                      const struct ovsdb_idl_table_class *class)
1244 {
1245     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
1246     uuid_generate(&row->uuid);
1247     row->table = ovsdb_idl_table_from_class(txn->idl, class);
1248     row->new = xmalloc(class->n_columns * sizeof *row->new);
1249     row->written = bitmap_allocate(class->n_columns);
1250     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
1251     return row;
1252 }
1253
1254 static void
1255 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
1256 {
1257     struct ovsdb_idl_txn *txn;
1258
1259     HMAP_FOR_EACH (txn, struct ovsdb_idl_txn, hmap_node,
1260                    &idl->outstanding_txns) {
1261         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
1262     }
1263 }
1264
1265 static struct ovsdb_idl_txn *
1266 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
1267 {
1268     struct ovsdb_idl_txn *txn;
1269
1270     HMAP_FOR_EACH_WITH_HASH (txn, struct ovsdb_idl_txn, hmap_node,
1271                              json_hash(id, 0), &idl->outstanding_txns) {
1272         if (json_equal(id, txn->request_id)) {
1273             return txn;
1274         }
1275     }
1276     return NULL;
1277 }
1278
1279 static bool
1280 check_json_type(const struct json *json, enum json_type type, const char *name)
1281 {
1282     if (!json) {
1283         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
1284         return false;
1285     } else if (json->type != type) {
1286         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
1287                      name, json_type_to_string(json->type),
1288                      json_type_to_string(type));
1289         return false;
1290     } else {
1291         return true;
1292     }
1293 }
1294
1295 static bool
1296 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
1297                                 const struct json_array *results)
1298 {
1299     struct json *count, *rows, *row, *column;
1300     struct shash *mutate, *select;
1301
1302     if (txn->inc_index + 2 > results->n) {
1303         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
1304                      "for increment (has %u, needs %u)",
1305                      results->n, txn->inc_index + 2);
1306         return false;
1307     }
1308
1309     /* We know that this is a JSON objects because the loop in
1310      * ovsdb_idl_txn_process_reply() checked. */
1311     mutate = json_object(results->elems[txn->inc_index]);
1312     count = shash_find_data(mutate, "count");
1313     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
1314         return false;
1315     }
1316     if (count->u.integer != 1) {
1317         VLOG_WARN_RL(&syntax_rl,
1318                      "\"mutate\" reply \"count\" is %"PRId64" instead of 1",
1319                      count->u.integer);
1320         return false;
1321     }
1322
1323     select = json_object(results->elems[txn->inc_index + 1]);
1324     rows = shash_find_data(select, "rows");
1325     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
1326         return false;
1327     }
1328     if (rows->u.array.n != 1) {
1329         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %u elements "
1330                      "instead of 1",
1331                      rows->u.array.n);
1332         return false;
1333     }
1334     row = rows->u.array.elems[0];
1335     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
1336         return false;
1337     }
1338     column = shash_find_data(json_object(row), txn->inc_column);
1339     if (!check_json_type(column, JSON_INTEGER,
1340                          "\"select\" reply inc column")) {
1341         return false;
1342     }
1343     txn->inc_new_value = column->u.integer;
1344     return true;
1345 }
1346
1347
1348 static bool
1349 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
1350                             const struct jsonrpc_msg *msg)
1351 {
1352     struct ovsdb_idl_txn *txn;
1353     enum ovsdb_idl_txn_status status;
1354
1355     txn = ovsdb_idl_txn_find(idl, msg->id);
1356     if (!txn) {
1357         return false;
1358     }
1359
1360     if (msg->type == JSONRPC_ERROR) {
1361         status = TXN_ERROR;
1362     } else if (msg->result->type != JSON_ARRAY) {
1363         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
1364         status = TXN_ERROR;
1365     } else {
1366         int hard_errors = 0;
1367         int soft_errors = 0;
1368         size_t i;
1369
1370         for (i = 0; i < msg->result->u.array.n; i++) {
1371             struct json *json = msg->result->u.array.elems[i];
1372
1373             if (json->type == JSON_NULL) {
1374                 /* This isn't an error in itself but indicates that some prior
1375                  * operation failed, so make sure that we know about it. */
1376                 soft_errors++;
1377             } else if (json->type == JSON_OBJECT) {
1378                 struct json *error;
1379
1380                 error = shash_find_data(json_object(json), "error");
1381                 if (error) {
1382                     if (error->type == JSON_STRING) {
1383                         if (!strcmp(error->u.string, "timed out")) {
1384                             soft_errors++;
1385                         } else if (strcmp(error->u.string, "aborted")) {
1386                             hard_errors++;
1387                         }
1388                     } else {
1389                         hard_errors++;
1390                         VLOG_WARN_RL(&syntax_rl,
1391                                      "\"error\" in reply is not JSON string");
1392                     }
1393                 }
1394             } else {
1395                 hard_errors++;
1396                 VLOG_WARN_RL(&syntax_rl,
1397                              "operation reply is not JSON null or object");
1398             }
1399         }
1400
1401         if (txn->inc_table
1402             && !soft_errors
1403             && !hard_errors
1404             && !ovsdb_idl_txn_process_inc_reply(txn,
1405                                                 json_array(msg->result))) {
1406             hard_errors++;
1407         }
1408
1409         status = (hard_errors ? TXN_ERROR
1410                   : soft_errors ? TXN_TRY_AGAIN
1411                   : TXN_SUCCESS);
1412     }
1413
1414     ovsdb_idl_txn_complete(txn, status);
1415     return true;
1416 }
1417
1418 struct ovsdb_idl_txn *
1419 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
1420 {
1421     struct ovsdb_idl_txn *txn = row->table->idl->txn;
1422     assert(txn != NULL);
1423     return txn;
1424 }