ovsdb: Provide helper function to determine if IDL has ever connected
[sliver-openvswitch.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 #include <stdlib.h>
25
26 #include "bitmap.h"
27 #include "dynamic-string.h"
28 #include "json.h"
29 #include "jsonrpc.h"
30 #include "ovsdb-data.h"
31 #include "ovsdb-error.h"
32 #include "ovsdb-idl-provider.h"
33 #include "poll-loop.h"
34 #include "shash.h"
35 #include "util.h"
36
37 #define THIS_MODULE VLM_ovsdb_idl
38 #include "vlog.h"
39
40 /* An arc from one idl_row to another.  When row A contains a UUID that
41  * references row B, this is represented by an arc from A (the source) to B
42  * (the destination).
43  *
44  * Arcs from a row to itself are omitted, that is, src and dst are always
45  * different.
46  *
47  * Arcs are never duplicated, that is, even if there are multiple references
48  * from A to B, there is only a single arc from A to B.
49  *
50  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
51  * A.  Both an arc and its converse may both be present, if each row refers
52  * to the other circularly.
53  *
54  * The source and destination row may be in the same table or in different
55  * tables.
56  */
57 struct ovsdb_idl_arc {
58     struct list src_node;       /* In src->src_arcs list. */
59     struct list dst_node;       /* In dst->dst_arcs list. */
60     struct ovsdb_idl_row *src;  /* Source row. */
61     struct ovsdb_idl_row *dst;  /* Destination row. */
62 };
63
64 struct ovsdb_idl {
65     const struct ovsdb_idl_class *class;
66     struct jsonrpc_session *session;
67     struct shash table_by_name;
68     struct ovsdb_idl_table *tables;
69     struct json *monitor_request_id;
70     unsigned int last_monitor_request_seqno;
71     unsigned int change_seqno;
72
73     /* Transaction support. */
74     struct ovsdb_idl_txn *txn;
75     struct hmap outstanding_txns;
76 };
77
78 struct ovsdb_idl_txn {
79     struct hmap_node hmap_node;
80     struct json *request_id;
81     struct ovsdb_idl *idl;
82     struct hmap txn_rows;
83     enum ovsdb_idl_txn_status status;
84     bool dry_run;
85     struct ds comment;
86
87     /* Increments. */
88     char *inc_table;
89     char *inc_column;
90     struct json *inc_where;
91     unsigned int inc_index;
92     int64_t inc_new_value;
93 };
94
95 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
96 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
97
98 static void ovsdb_idl_clear(struct ovsdb_idl *);
99 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
100 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
101 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
102                                                     const struct json *);
103 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
104                                      const struct uuid *,
105                                      const struct json *old,
106                                      const struct json *new);
107 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
108 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
109 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
110
111 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
112 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
113     const struct ovsdb_idl_table_class *);
114 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
115                                                   const struct uuid *);
116 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
117
118 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
119 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
120
121 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
122 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
123                                         const struct jsonrpc_msg *msg);
124
125 struct ovsdb_idl *
126 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
127 {
128     struct ovsdb_idl *idl;
129     size_t i;
130
131     idl = xzalloc(sizeof *idl);
132     idl->class = class;
133     idl->session = jsonrpc_session_open(remote);
134     shash_init(&idl->table_by_name);
135     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
136     for (i = 0; i < class->n_tables; i++) {
137         const struct ovsdb_idl_table_class *tc = &class->tables[i];
138         struct ovsdb_idl_table *table = &idl->tables[i];
139         size_t j;
140
141         assert(!shash_find(&idl->table_by_name, tc->name));
142         shash_add(&idl->table_by_name, tc->name, table);
143         table->class = tc;
144         shash_init(&table->columns);
145         for (j = 0; j < tc->n_columns; j++) {
146             const struct ovsdb_idl_column *column = &tc->columns[j];
147
148             assert(!shash_find(&table->columns, column->name));
149             shash_add(&table->columns, column->name, column);
150         }
151         hmap_init(&table->rows);
152         table->idl = idl;
153     }
154     idl->last_monitor_request_seqno = UINT_MAX;
155     hmap_init(&idl->outstanding_txns);
156
157     return idl;
158 }
159
160 void
161 ovsdb_idl_destroy(struct ovsdb_idl *idl)
162 {
163     if (idl) {
164         size_t i;
165
166         assert(!idl->txn);
167         ovsdb_idl_clear(idl);
168         jsonrpc_session_close(idl->session);
169
170         for (i = 0; i < idl->class->n_tables; i++) {
171             struct ovsdb_idl_table *table = &idl->tables[i];
172             shash_destroy(&table->columns);
173             hmap_destroy(&table->rows);
174         }
175         shash_destroy(&idl->table_by_name);
176         free(idl->tables);
177         json_destroy(idl->monitor_request_id);
178         free(idl);
179     }
180 }
181
182 static void
183 ovsdb_idl_clear(struct ovsdb_idl *idl)
184 {
185     bool changed = false;
186     size_t i;
187
188     for (i = 0; i < idl->class->n_tables; i++) {
189         struct ovsdb_idl_table *table = &idl->tables[i];
190         struct ovsdb_idl_row *row, *next_row;
191
192         if (hmap_is_empty(&table->rows)) {
193             continue;
194         }
195
196         changed = true;
197         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
198                             &table->rows) {
199             struct ovsdb_idl_arc *arc, *next_arc;
200
201             if (!ovsdb_idl_row_is_orphan(row)) {
202                 (row->table->class->unparse)(row);
203                 ovsdb_idl_row_clear_old(row);
204             }
205             hmap_remove(&table->rows, &row->hmap_node);
206             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
207                                 &row->src_arcs) {
208                 free(arc);
209             }
210             /* No need to do anything with dst_arcs: some node has those arcs
211              * as forward arcs and will destroy them itself. */
212
213             free(row);
214         }
215     }
216
217     if (changed) {
218         idl->change_seqno++;
219     }
220 }
221
222 void
223 ovsdb_idl_run(struct ovsdb_idl *idl)
224 {
225     int i;
226
227     assert(!idl->txn);
228     jsonrpc_session_run(idl->session);
229     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
230         struct jsonrpc_msg *msg, *reply;
231         unsigned int seqno;
232
233         seqno = jsonrpc_session_get_seqno(idl->session);
234         if (idl->last_monitor_request_seqno != seqno) {
235             idl->last_monitor_request_seqno = seqno;
236             ovsdb_idl_txn_abort_all(idl);
237             ovsdb_idl_send_monitor_request(idl);
238             break;
239         }
240
241         msg = jsonrpc_session_recv(idl->session);
242         if (!msg) {
243             break;
244         }
245
246         reply = NULL;
247         if (msg->type == JSONRPC_NOTIFY
248                    && !strcmp(msg->method, "update")
249                    && msg->params->type == JSON_ARRAY
250                    && msg->params->u.array.n == 2
251                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
252             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
253         } else if (msg->type == JSONRPC_REPLY
254                    && idl->monitor_request_id
255                    && json_equal(idl->monitor_request_id, msg->id)) {
256             json_destroy(idl->monitor_request_id);
257             idl->monitor_request_id = NULL;
258             ovsdb_idl_clear(idl);
259             ovsdb_idl_parse_update(idl, msg->result);
260         } else if (msg->type == JSONRPC_REPLY
261                    && msg->id && msg->id->type == JSON_STRING
262                    && !strcmp(msg->id->u.string, "echo")) {
263             /* It's a reply to our echo request.  Ignore it. */
264         } else if ((msg->type == JSONRPC_ERROR
265                     || msg->type == JSONRPC_REPLY)
266                    && ovsdb_idl_txn_process_reply(idl, msg)) {
267             /* ovsdb_idl_txn_process_reply() did everything needful. */
268         } else {
269             VLOG_WARN("%s: received unexpected %s message",
270                       jsonrpc_session_get_name(idl->session),
271                       jsonrpc_msg_type_to_string(msg->type));
272             jsonrpc_session_force_reconnect(idl->session);
273         }
274         if (reply) {
275             jsonrpc_session_send(idl->session, reply);
276         }
277         jsonrpc_msg_destroy(msg);
278     }
279 }
280
281 void
282 ovsdb_idl_wait(struct ovsdb_idl *idl)
283 {
284     jsonrpc_session_wait(idl->session);
285     jsonrpc_session_recv_wait(idl->session);
286 }
287
288 unsigned int
289 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
290 {
291     return idl->change_seqno;
292 }
293
294 bool
295 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
296 {
297     return ovsdb_idl_get_seqno(idl) != 0;
298 }
299
300 void
301 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
302 {
303     jsonrpc_session_force_reconnect(idl->session);
304 }
305 \f
306 static void
307 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
308 {
309     struct json *monitor_requests;
310     struct jsonrpc_msg *msg;
311     size_t i;
312
313     monitor_requests = json_object_create();
314     for (i = 0; i < idl->class->n_tables; i++) {
315         const struct ovsdb_idl_table *table = &idl->tables[i];
316         const struct ovsdb_idl_table_class *tc = table->class;
317         struct json *monitor_request, *columns;
318         size_t i;
319
320         monitor_request = json_object_create();
321         columns = json_array_create_empty();
322         for (i = 0; i < tc->n_columns; i++) {
323             const struct ovsdb_idl_column *column = &tc->columns[i];
324             json_array_add(columns, json_string_create(column->name));
325         }
326         json_object_put(monitor_request, "columns", columns);
327         json_object_put(monitor_requests, tc->name, monitor_request);
328     }
329
330     json_destroy(idl->monitor_request_id);
331     msg = jsonrpc_create_request(
332         "monitor", json_array_create_2(json_null_create(), monitor_requests),
333         &idl->monitor_request_id);
334     jsonrpc_session_send(idl->session, msg);
335 }
336
337 static void
338 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
339 {
340     struct ovsdb_error *error;
341
342     idl->change_seqno++;
343
344     error = ovsdb_idl_parse_update__(idl, table_updates);
345     if (error) {
346         if (!VLOG_DROP_WARN(&syntax_rl)) {
347             char *s = ovsdb_error_to_string(error);
348             VLOG_WARN_RL(&syntax_rl, "%s", s);
349             free(s);
350         }
351         ovsdb_error_destroy(error);
352     }
353 }
354
355 static struct ovsdb_error *
356 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
357                          const struct json *table_updates)
358 {
359     const struct shash_node *tables_node;
360
361     if (table_updates->type != JSON_OBJECT) {
362         return ovsdb_syntax_error(table_updates, NULL,
363                                   "<table-updates> is not an object");
364     }
365     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
366         const struct json *table_update = tables_node->data;
367         const struct shash_node *table_node;
368         struct ovsdb_idl_table *table;
369
370         table = shash_find_data(&idl->table_by_name, tables_node->name);
371         if (!table) {
372             return ovsdb_syntax_error(
373                 table_updates, NULL,
374                 "<table-updates> includes unknown table \"%s\"",
375                 tables_node->name);
376         }
377
378         if (table_update->type != JSON_OBJECT) {
379             return ovsdb_syntax_error(table_update, NULL,
380                                       "<table-update> for table \"%s\" is "
381                                       "not an object", table->class->name);
382         }
383         SHASH_FOR_EACH (table_node, json_object(table_update)) {
384             const struct json *row_update = table_node->data;
385             const struct json *old_json, *new_json;
386             struct uuid uuid;
387
388             if (!uuid_from_string(&uuid, table_node->name)) {
389                 return ovsdb_syntax_error(table_update, NULL,
390                                           "<table-update> for table \"%s\" "
391                                           "contains bad UUID "
392                                           "\"%s\" as member name",
393                                           table->class->name,
394                                           table_node->name);
395             }
396             if (row_update->type != JSON_OBJECT) {
397                 return ovsdb_syntax_error(row_update, NULL,
398                                           "<table-update> for table \"%s\" "
399                                           "contains <row-update> for %s that "
400                                           "is not an object",
401                                           table->class->name,
402                                           table_node->name);
403             }
404
405             old_json = shash_find_data(json_object(row_update), "old");
406             new_json = shash_find_data(json_object(row_update), "new");
407             if (old_json && old_json->type != JSON_OBJECT) {
408                 return ovsdb_syntax_error(old_json, NULL,
409                                           "\"old\" <row> is not object");
410             } else if (new_json && new_json->type != JSON_OBJECT) {
411                 return ovsdb_syntax_error(new_json, NULL,
412                                           "\"new\" <row> is not object");
413             } else if ((old_json != NULL) + (new_json != NULL)
414                        != shash_count(json_object(row_update))) {
415                 return ovsdb_syntax_error(row_update, NULL,
416                                           "<row-update> contains unexpected "
417                                           "member");
418             } else if (!old_json && !new_json) {
419                 return ovsdb_syntax_error(row_update, NULL,
420                                           "<row-update> missing \"old\" "
421                                           "and \"new\" members");
422             }
423
424             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
425         }
426     }
427
428     return NULL;
429 }
430
431 static struct ovsdb_idl_row *
432 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
433 {
434     struct ovsdb_idl_row *row;
435
436     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
437                              uuid_hash(uuid), &table->rows) {
438         if (uuid_equals(&row->uuid, uuid)) {
439             return row;
440         }
441     }
442     return NULL;
443 }
444
445 static void
446 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
447                          const struct uuid *uuid, const struct json *old,
448                          const struct json *new)
449 {
450     struct ovsdb_idl_row *row;
451
452     row = ovsdb_idl_get_row(table, uuid);
453     if (!new) {
454         /* Delete row. */
455         if (row && !ovsdb_idl_row_is_orphan(row)) {
456             /* XXX perhaps we should check the 'old' values? */
457             ovsdb_idl_delete_row(row);
458         } else {
459             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
460                          "from table %s",
461                          UUID_ARGS(uuid), table->class->name);
462         }
463     } else if (!old) {
464         /* Insert row. */
465         if (!row) {
466             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
467         } else if (ovsdb_idl_row_is_orphan(row)) {
468             ovsdb_idl_insert_row(row, new);
469         } else {
470             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
471                          "table %s", UUID_ARGS(uuid), table->class->name);
472             ovsdb_idl_modify_row(row, new);
473         }
474     } else {
475         /* Modify row. */
476         if (row) {
477             /* XXX perhaps we should check the 'old' values? */
478             if (!ovsdb_idl_row_is_orphan(row)) {
479                 ovsdb_idl_modify_row(row, new);
480             } else {
481                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
482                              "referenced row "UUID_FMT" in table %s",
483                              UUID_ARGS(uuid), table->class->name);
484                 ovsdb_idl_insert_row(row, new);
485             }
486         } else {
487             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
488                          "in table %s", UUID_ARGS(uuid), table->class->name);
489             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
490         }
491     }
492 }
493
494 static void
495 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
496 {
497     struct ovsdb_idl_table *table = row->table;
498     struct shash_node *node;
499
500     SHASH_FOR_EACH (node, json_object(row_json)) {
501         const char *column_name = node->name;
502         const struct ovsdb_idl_column *column;
503         struct ovsdb_datum datum;
504         struct ovsdb_error *error;
505
506         column = shash_find_data(&table->columns, column_name);
507         if (!column) {
508             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
509                          column_name, UUID_ARGS(&row->uuid));
510             continue;
511         }
512
513         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
514         if (!error) {
515             ovsdb_datum_swap(&row->old[column - table->class->columns],
516                              &datum);
517             ovsdb_datum_destroy(&datum, &column->type);
518         } else {
519             char *s = ovsdb_error_to_string(error);
520             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
521                          " in table %s: %s", column_name,
522                          UUID_ARGS(&row->uuid), table->class->name, s);
523             free(s);
524             ovsdb_error_destroy(error);
525         }
526     }
527 }
528
529 static bool
530 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
531 {
532     return !row->old;
533 }
534
535 static void
536 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
537 {
538     assert(row->old == row->new);
539     if (!ovsdb_idl_row_is_orphan(row)) {
540         const struct ovsdb_idl_table_class *class = row->table->class;
541         size_t i;
542
543         for (i = 0; i < class->n_columns; i++) {
544             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
545         }
546         free(row->old);
547         row->old = row->new = NULL;
548     }
549 }
550
551 static void
552 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
553 {
554     if (row->old != row->new) {
555         if (row->new) {
556             const struct ovsdb_idl_table_class *class = row->table->class;
557             size_t i;
558
559             BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
560                 ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
561             }
562             free(row->new);
563             free(row->written);
564             row->written = NULL;
565         }
566         row->new = row->old;
567     }
568 }
569
570 static void
571 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
572 {
573     struct ovsdb_idl_arc *arc, *next;
574
575     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
576      * that this causes to be unreferenced. */
577     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
578                         &row->src_arcs) {
579         list_remove(&arc->dst_node);
580         if (destroy_dsts
581             && ovsdb_idl_row_is_orphan(arc->dst)
582             && list_is_empty(&arc->dst->dst_arcs)) {
583             ovsdb_idl_row_destroy(arc->dst);
584         }
585         free(arc);
586     }
587     list_init(&row->src_arcs);
588 }
589
590 /* Force nodes that reference 'row' to reparse. */
591 static void
592 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row, bool destroy_dsts)
593 {
594     struct ovsdb_idl_arc *arc, *next;
595
596     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
597      * 'arc', so we need to use the "safe" variant of list traversal.  However,
598      * calling ref->table->class->parse will add an arc equivalent to 'arc' to
599      * row->arcs.  That could be a problem for traversal, but it adds it at the
600      * beginning of the list to prevent us from stumbling upon it again.
601      *
602      * (If duplicate arcs were possible then we would need to make sure that
603      * 'next' didn't also point into 'arc''s destination, but we forbid
604      * duplicate arcs.) */
605     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
606                         &row->dst_arcs) {
607         struct ovsdb_idl_row *ref = arc->src;
608
609         (ref->table->class->unparse)(ref);
610         ovsdb_idl_row_clear_arcs(ref, destroy_dsts);
611         (ref->table->class->parse)(ref);
612     }
613 }
614
615 static struct ovsdb_idl_row *
616 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
617 {
618     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
619     memset(row, 0, sizeof *row);
620     list_init(&row->src_arcs);
621     list_init(&row->dst_arcs);
622     hmap_node_nullify(&row->txn_node);
623     return row;
624 }
625
626 static struct ovsdb_idl_row *
627 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
628 {
629     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
630     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
631     row->uuid = *uuid;
632     row->table = table;
633     return row;
634 }
635
636 static void
637 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
638 {
639     if (row) {
640         ovsdb_idl_row_clear_old(row);
641         hmap_remove(&row->table->rows, &row->hmap_node);
642         free(row);
643     }
644 }
645
646 static void
647 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
648 {
649     const struct ovsdb_idl_table_class *class = row->table->class;
650     size_t i;
651
652     assert(!row->old && !row->new);
653     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
654     for (i = 0; i < class->n_columns; i++) {
655         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
656     }
657     ovsdb_idl_row_update(row, row_json);
658     (class->parse)(row);
659
660     ovsdb_idl_row_reparse_backrefs(row, false);
661 }
662
663 static void
664 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
665 {
666     (row->table->class->unparse)(row);
667     ovsdb_idl_row_clear_arcs(row, true);
668     ovsdb_idl_row_clear_old(row);
669     if (list_is_empty(&row->dst_arcs)) {
670         ovsdb_idl_row_destroy(row);
671     } else {
672         ovsdb_idl_row_reparse_backrefs(row, true);
673     }
674 }
675
676 static void
677 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
678 {
679     (row->table->class->unparse)(row);
680     ovsdb_idl_row_clear_arcs(row, true);
681     ovsdb_idl_row_update(row, row_json);
682     (row->table->class->parse)(row);
683 }
684
685 static bool
686 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
687 {
688     const struct ovsdb_idl_arc *arc;
689
690     /* No self-arcs. */
691     if (src == dst) {
692         return false;
693     }
694
695     /* No duplicate arcs.
696      *
697      * We only need to test whether the first arc in dst->dst_arcs originates
698      * at 'src', since we add all of the arcs from a given source in a clump
699      * (in a single call to a row's ->parse function) and new arcs are always
700      * added at the front of the dst_arcs list. */
701     if (list_is_empty(&dst->dst_arcs)) {
702         return true;
703     }
704     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
705     return arc->src != src;
706 }
707
708 static struct ovsdb_idl_table *
709 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
710                            const struct ovsdb_idl_table_class *table_class)
711 {
712     return &idl->tables[table_class - idl->class->tables];
713 }
714
715 struct ovsdb_idl_row *
716 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
717                       struct ovsdb_idl_table_class *dst_table_class,
718                       const struct uuid *dst_uuid)
719 {
720     struct ovsdb_idl *idl = src->table->idl;
721     struct ovsdb_idl_table *dst_table;
722     struct ovsdb_idl_arc *arc;
723     struct ovsdb_idl_row *dst;
724
725     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
726     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
727     if (!dst) {
728         dst = ovsdb_idl_row_create(dst_table, dst_uuid);
729     }
730
731     /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
732     if (may_add_arc(src, dst)) {
733         /* The arc *must* be added at the front of the dst_arcs list.  See
734          * ovsdb_idl_row_reparse_backrefs() for details. */
735         arc = xmalloc(sizeof *arc);
736         list_push_front(&src->src_arcs, &arc->src_node);
737         list_push_front(&dst->dst_arcs, &arc->dst_node);
738         arc->src = src;
739         arc->dst = dst;
740     }
741
742     return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
743 }
744
745 static struct ovsdb_idl_row *
746 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
747 {
748     for (; node; node = hmap_next(&table->rows, node)) {
749         struct ovsdb_idl_row *row;
750
751         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
752         if (!ovsdb_idl_row_is_orphan(row)) {
753             return row;
754         }
755     }
756     return NULL;
757 }
758
759 struct ovsdb_idl_row *
760 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
761                     const struct ovsdb_idl_table_class *table_class)
762 {
763     struct ovsdb_idl_table *table
764         = ovsdb_idl_table_from_class(idl, table_class);
765     return next_real_row(table, hmap_first(&table->rows));
766 }
767
768 struct ovsdb_idl_row *
769 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
770 {
771     struct ovsdb_idl_table *table = row->table;
772
773     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
774 }
775 \f
776 /* Transactions. */
777
778 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
779                                    enum ovsdb_idl_txn_status);
780
781 const char *
782 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
783 {
784     switch (status) {
785     case TXN_UNCHANGED:
786         return "unchanged";
787     case TXN_INCOMPLETE:
788         return "incomplete";
789     case TXN_ABORTED:
790         return "aborted";
791     case TXN_SUCCESS:
792         return "success";
793     case TXN_TRY_AGAIN:
794         return "try again";
795     case TXN_ERROR:
796         return "error";
797     }
798     return "<unknown>";
799 }
800
801 struct ovsdb_idl_txn *
802 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
803 {
804     struct ovsdb_idl_txn *txn;
805
806     assert(!idl->txn);
807     idl->txn = txn = xmalloc(sizeof *txn);
808     txn->idl = idl;
809     txn->status = TXN_INCOMPLETE;
810     hmap_init(&txn->txn_rows);
811     txn->dry_run = false;
812     ds_init(&txn->comment);
813     txn->inc_table = NULL;
814     txn->inc_column = NULL;
815     txn->inc_where = NULL;
816     return txn;
817 }
818
819 void
820 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s)
821 {
822     if (txn->comment.length) {
823         ds_put_char(&txn->comment, '\n');
824     }
825     ds_put_cstr(&txn->comment, s);
826 }
827
828 void
829 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
830 {
831     txn->dry_run = true;
832 }
833
834 void
835 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn, const char *table,
836                         const char *column, const struct json *where)
837 {
838     assert(!txn->inc_table);
839     txn->inc_table = xstrdup(table);
840     txn->inc_column = xstrdup(column);
841     txn->inc_where = where ? json_clone(where) : json_array_create_empty();
842 }
843
844 void
845 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
846 {
847     if (txn->status == TXN_INCOMPLETE) {
848         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
849     }
850     ovsdb_idl_txn_abort(txn);
851     ds_destroy(&txn->comment);
852     free(txn->inc_table);
853     free(txn->inc_column);
854     json_destroy(txn->inc_where);
855     free(txn);
856 }
857
858 void
859 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
860 {
861     if (txn->status != TXN_INCOMPLETE) {
862         poll_immediate_wake();
863     }
864 }
865
866 static struct json *
867 where_uuid_equals(const struct uuid *uuid)
868 {
869     return
870         json_array_create_1(
871             json_array_create_3(
872                 json_string_create("_uuid"),
873                 json_string_create("=="),
874                 json_array_create_2(
875                     json_string_create("uuid"),
876                     json_string_create_nocopy(
877                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
878 }
879
880 static char *
881 uuid_name_from_uuid(const struct uuid *uuid)
882 {
883     char *name;
884     char *p;
885
886     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
887     for (p = name; *p != '\0'; p++) {
888         if (*p == '-') {
889             *p = '_';
890         }
891     }
892
893     return name;
894 }
895
896 static const struct ovsdb_idl_row *
897 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
898 {
899     const struct ovsdb_idl_row *row;
900
901     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, txn_node,
902                              uuid_hash(uuid), &txn->txn_rows) {
903         if (uuid_equals(&row->uuid, uuid)) {
904             return row;
905         }
906     }
907     return NULL;
908 }
909
910 /* XXX there must be a cleaner way to do this */
911 static struct json *
912 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
913 {
914     if (json->type == JSON_ARRAY) {
915         struct uuid uuid;
916         size_t i;
917
918         if (json->u.array.n == 2
919             && json->u.array.elems[0]->type == JSON_STRING
920             && json->u.array.elems[1]->type == JSON_STRING
921             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
922             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
923             const struct ovsdb_idl_row *row;
924
925             row = ovsdb_idl_txn_get_row(txn, &uuid);
926             if (row && !row->old && row->new) {
927                 json_destroy(json);
928
929                 return json_array_create_2(
930                     json_string_create("named-uuid"),
931                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
932             }
933         }
934
935         for (i = 0; i < json->u.array.n; i++) {
936             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
937                                                       txn);
938         }
939     } else if (json->type == JSON_OBJECT) {
940         struct shash_node *node;
941
942         SHASH_FOR_EACH (node, json_object(json)) {
943             node->data = substitute_uuids(node->data, txn);
944         }
945     }
946     return json;
947 }
948
949 static void
950 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
951 {
952     struct ovsdb_idl_row *row, *next;
953
954     HMAP_FOR_EACH_SAFE (row, next, struct ovsdb_idl_row, txn_node,
955                         &txn->txn_rows) {
956         if (row->old && row->written) {
957             (row->table->class->unparse)(row);
958             ovsdb_idl_row_clear_arcs(row, false);
959             (row->table->class->parse)(row);
960         }
961         ovsdb_idl_row_clear_new(row);
962
963         free(row->prereqs);
964         row->prereqs = NULL;
965
966         free(row->written);
967         row->written = NULL;
968
969         hmap_remove(&txn->txn_rows, &row->txn_node);
970         hmap_node_nullify(&row->txn_node);
971     }
972     hmap_destroy(&txn->txn_rows);
973     hmap_init(&txn->txn_rows);
974 }
975
976 enum ovsdb_idl_txn_status
977 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
978 {
979     struct ovsdb_idl_row *row;
980     struct json *operations;
981     bool any_updates;
982
983     if (txn != txn->idl->txn) {
984         return txn->status;
985     }
986
987     operations = json_array_create_empty();
988
989     /* Add prerequisites and declarations of new rows. */
990     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
991         /* XXX check that deleted rows exist even if no prereqs? */
992         if (row->prereqs) {
993             const struct ovsdb_idl_table_class *class = row->table->class;
994             size_t n_columns = class->n_columns;
995             struct json *op, *columns, *row_json;
996             size_t idx;
997
998             op = json_object_create();
999             json_array_add(operations, op);
1000             json_object_put_string(op, "op", "wait");
1001             json_object_put_string(op, "table", class->name);
1002             json_object_put(op, "timeout", json_integer_create(0));
1003             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1004             json_object_put_string(op, "until", "==");
1005             columns = json_array_create_empty();
1006             json_object_put(op, "columns", columns);
1007             row_json = json_object_create();
1008             json_object_put(op, "rows", json_array_create_1(row_json));
1009
1010             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
1011                 const struct ovsdb_idl_column *column = &class->columns[idx];
1012                 json_array_add(columns, json_string_create(column->name));
1013                 json_object_put(row_json, column->name,
1014                                 ovsdb_datum_to_json(&row->old[idx],
1015                                                     &column->type));
1016             }
1017         }
1018         if (row->new && !row->old) {
1019             struct json *op;
1020
1021             op = json_object_create();
1022             json_array_add(operations, op);
1023             json_object_put_string(op, "op", "declare");
1024             json_object_put(op, "uuid-name",
1025                             json_string_create_nocopy(
1026                                 uuid_name_from_uuid(&row->uuid)));
1027         }
1028     }
1029
1030     /* Add updates. */
1031     any_updates = false;
1032     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
1033         const struct ovsdb_idl_table_class *class = row->table->class;
1034
1035         if (row->old == row->new) {
1036             continue;
1037         } else if (!row->new) {
1038             struct json *op = json_object_create();
1039             json_object_put_string(op, "op", "delete");
1040             json_object_put_string(op, "table", class->name);
1041             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1042             json_array_add(operations, op);
1043             any_updates = true;
1044         } else {
1045             struct json *row_json;
1046             struct json *op;
1047             size_t idx;
1048
1049             op = json_object_create();
1050             json_object_put_string(op, "op", row->old ? "update" : "insert");
1051             json_object_put_string(op, "table", class->name);
1052             if (row->old) {
1053                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1054             } else {
1055                 json_object_put(op, "uuid-name",
1056                                 json_string_create_nocopy(
1057                                     uuid_name_from_uuid(&row->uuid)));
1058             }
1059             row_json = json_object_create();
1060             json_object_put(op, "row", row_json);
1061
1062             BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
1063                 const struct ovsdb_idl_column *column = &class->columns[idx];
1064
1065                 if (!row->old || !ovsdb_datum_equals(&row->old[idx],
1066                                                      &row->new[idx],
1067                                                      &column->type)) {
1068                     json_object_put(row_json, column->name,
1069                                     substitute_uuids(
1070                                         ovsdb_datum_to_json(&row->new[idx],
1071                                                             &column->type),
1072                                         txn));
1073                 }
1074             }
1075
1076             if (!row->old || !shash_is_empty(json_object(row_json))) {
1077                 json_array_add(operations, op);
1078                 any_updates = true;
1079             } else {
1080                 json_destroy(op);
1081             }
1082         }
1083     }
1084
1085     /* Add increment. */
1086     if (txn->inc_table && any_updates) {
1087         struct json *op;
1088
1089         txn->inc_index = operations->u.array.n;
1090
1091         op = json_object_create();
1092         json_object_put_string(op, "op", "mutate");
1093         json_object_put_string(op, "table", txn->inc_table);
1094         json_object_put(op, "where",
1095                         substitute_uuids(json_clone(txn->inc_where), txn));
1096         json_object_put(op, "mutations",
1097                         json_array_create_1(
1098                             json_array_create_3(
1099                                 json_string_create(txn->inc_column),
1100                                 json_string_create("+="),
1101                                 json_integer_create(1))));
1102         json_array_add(operations, op);
1103
1104         op = json_object_create();
1105         json_object_put_string(op, "op", "select");
1106         json_object_put_string(op, "table", txn->inc_table);
1107         json_object_put(op, "where",
1108                         substitute_uuids(json_clone(txn->inc_where), txn));
1109         json_object_put(op, "columns",
1110                         json_array_create_1(json_string_create(
1111                                                 txn->inc_column)));
1112         json_array_add(operations, op);
1113     }
1114
1115     if (txn->comment.length) {
1116         struct json *op = json_object_create();
1117         json_object_put_string(op, "op", "comment");
1118         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
1119         json_array_add(operations, op);
1120     }
1121
1122     if (txn->dry_run) {
1123         struct json *op = json_object_create();
1124         json_object_put_string(op, "op", "abort");
1125         json_array_add(operations, op);
1126     }
1127
1128     if (!any_updates) {
1129         txn->status = TXN_UNCHANGED;
1130         json_destroy(operations);
1131     } else if (!jsonrpc_session_send(
1132                    txn->idl->session,
1133                    jsonrpc_create_request(
1134                        "transact", operations, &txn->request_id))) {
1135         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1136                     json_hash(txn->request_id, 0));
1137     } else {
1138         txn->status = TXN_INCOMPLETE;
1139     }
1140
1141     txn->idl->txn = NULL;
1142     ovsdb_idl_txn_disassemble(txn);
1143     return txn->status;
1144 }
1145
1146 int64_t
1147 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
1148 {
1149     assert(txn->status == TXN_SUCCESS);
1150     return txn->inc_new_value;
1151 }
1152
1153 void
1154 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1155 {
1156     ovsdb_idl_txn_disassemble(txn);
1157     if (txn->status == TXN_INCOMPLETE) {
1158         txn->status = TXN_ABORTED;
1159     }
1160 }
1161
1162 static void
1163 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1164                        enum ovsdb_idl_txn_status status)
1165 {
1166     txn->status = status;
1167     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1168 }
1169
1170 void
1171 ovsdb_idl_txn_write(struct ovsdb_idl_row *row,
1172                     const struct ovsdb_idl_column *column,
1173                     struct ovsdb_datum *datum)
1174 {
1175     const struct ovsdb_idl_table_class *class = row->table->class;
1176     size_t column_idx = column - class->columns;
1177
1178     assert(row->new);
1179     if (hmap_node_is_null(&row->txn_node)) {
1180         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1181                     uuid_hash(&row->uuid));
1182     }
1183     if (row->old == row->new) {
1184         row->new = xmalloc(class->n_columns * sizeof *row->new);
1185     }
1186     if (!row->written) {
1187         row->written = bitmap_allocate(class->n_columns);
1188     }
1189     if (bitmap_is_set(row->written, column_idx)) {
1190         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1191     } else {
1192         bitmap_set1(row->written, column_idx);
1193     }
1194     row->new[column_idx] = *datum;
1195 }
1196
1197 void
1198 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1199                      const struct ovsdb_idl_column *column)
1200 {
1201     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1202     const struct ovsdb_idl_table_class *class = row->table->class;
1203     size_t column_idx = column - class->columns;
1204
1205     assert(row->new);
1206     if (!row->old
1207         || (row->written && bitmap_is_set(row->written, column_idx))) {
1208         return;
1209     }
1210
1211     if (hmap_node_is_null(&row->txn_node)) {
1212         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1213                     uuid_hash(&row->uuid));
1214     }
1215     if (!row->prereqs) {
1216         row->prereqs = bitmap_allocate(class->n_columns);
1217     }
1218     bitmap_set1(row->prereqs, column_idx);
1219 }
1220
1221 void
1222 ovsdb_idl_txn_delete(struct ovsdb_idl_row *row)
1223 {
1224     assert(row->new);
1225     if (!row->old) {
1226         ovsdb_idl_row_clear_new(row);
1227         assert(!row->prereqs);
1228         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
1229         free(row);
1230     }
1231     if (hmap_node_is_null(&row->txn_node)) {
1232         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1233                     uuid_hash(&row->uuid));
1234     }
1235     ovsdb_idl_row_clear_new(row);
1236     row->new = NULL;
1237 }
1238
1239 struct ovsdb_idl_row *
1240 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
1241                      const struct ovsdb_idl_table_class *class)
1242 {
1243     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
1244     uuid_generate(&row->uuid);
1245     row->table = ovsdb_idl_table_from_class(txn->idl, class);
1246     row->new = xmalloc(class->n_columns * sizeof *row->new);
1247     row->written = bitmap_allocate(class->n_columns);
1248     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
1249     return row;
1250 }
1251
1252 static void
1253 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
1254 {
1255     struct ovsdb_idl_txn *txn;
1256
1257     HMAP_FOR_EACH (txn, struct ovsdb_idl_txn, hmap_node,
1258                    &idl->outstanding_txns) {
1259         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
1260     }
1261 }
1262
1263 static struct ovsdb_idl_txn *
1264 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
1265 {
1266     struct ovsdb_idl_txn *txn;
1267
1268     HMAP_FOR_EACH_WITH_HASH (txn, struct ovsdb_idl_txn, hmap_node,
1269                              json_hash(id, 0), &idl->outstanding_txns) {
1270         if (json_equal(id, txn->request_id)) {
1271             return txn;
1272         }
1273     }
1274     return NULL;
1275 }
1276
1277 static bool
1278 check_json_type(const struct json *json, enum json_type type, const char *name)
1279 {
1280     if (!json) {
1281         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
1282         return false;
1283     } else if (json->type != type) {
1284         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
1285                      name, json_type_to_string(json->type),
1286                      json_type_to_string(type));
1287         return false;
1288     } else {
1289         return true;
1290     }
1291 }
1292
1293 static bool
1294 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
1295                                 const struct json_array *results)
1296 {
1297     struct json *count, *rows, *row, *column;
1298     struct shash *mutate, *select;
1299
1300     if (txn->inc_index + 2 > results->n) {
1301         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
1302                      "for increment (has %u, needs %u)",
1303                      results->n, txn->inc_index + 2);
1304         return false;
1305     }
1306
1307     /* We know that this is a JSON objects because the loop in
1308      * ovsdb_idl_txn_process_reply() checked. */
1309     mutate = json_object(results->elems[txn->inc_index]);
1310     count = shash_find_data(mutate, "count");
1311     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
1312         return false;
1313     }
1314     if (count->u.integer != 1) {
1315         VLOG_WARN_RL(&syntax_rl,
1316                      "\"mutate\" reply \"count\" is %"PRId64" instead of 1",
1317                      count->u.integer);
1318         return false;
1319     }
1320
1321     select = json_object(results->elems[txn->inc_index + 1]);
1322     rows = shash_find_data(select, "rows");
1323     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
1324         return false;
1325     }
1326     if (rows->u.array.n != 1) {
1327         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %u elements "
1328                      "instead of 1",
1329                      rows->u.array.n);
1330         return false;
1331     }
1332     row = rows->u.array.elems[0];
1333     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
1334         return false;
1335     }
1336     column = shash_find_data(json_object(row), txn->inc_column);
1337     if (!check_json_type(column, JSON_INTEGER,
1338                          "\"select\" reply inc column")) {
1339         return false;
1340     }
1341     txn->inc_new_value = column->u.integer;
1342     return true;
1343 }
1344
1345
1346 static bool
1347 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
1348                             const struct jsonrpc_msg *msg)
1349 {
1350     struct ovsdb_idl_txn *txn;
1351     enum ovsdb_idl_txn_status status;
1352
1353     txn = ovsdb_idl_txn_find(idl, msg->id);
1354     if (!txn) {
1355         return false;
1356     }
1357
1358     if (msg->type == JSONRPC_ERROR) {
1359         status = TXN_ERROR;
1360     } else if (msg->result->type != JSON_ARRAY) {
1361         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
1362         status = TXN_ERROR;
1363     } else {
1364         int hard_errors = 0;
1365         int soft_errors = 0;
1366         size_t i;
1367
1368         for (i = 0; i < msg->result->u.array.n; i++) {
1369             struct json *json = msg->result->u.array.elems[i];
1370
1371             if (json->type == JSON_NULL) {
1372                 /* This isn't an error in itself but indicates that some prior
1373                  * operation failed, so make sure that we know about it. */
1374                 soft_errors++;
1375             } else if (json->type == JSON_OBJECT) {
1376                 struct json *error;
1377
1378                 error = shash_find_data(json_object(json), "error");
1379                 if (error) {
1380                     if (error->type == JSON_STRING) {
1381                         if (!strcmp(error->u.string, "timed out")) {
1382                             soft_errors++;
1383                         } else if (strcmp(error->u.string, "aborted")) {
1384                             hard_errors++;
1385                         }
1386                     } else {
1387                         hard_errors++;
1388                         VLOG_WARN_RL(&syntax_rl,
1389                                      "\"error\" in reply is not JSON string");
1390                     }
1391                 }
1392             } else {
1393                 hard_errors++;
1394                 VLOG_WARN_RL(&syntax_rl,
1395                              "operation reply is not JSON null or object");
1396             }
1397         }
1398
1399         if (txn->inc_table
1400             && !soft_errors
1401             && !hard_errors
1402             && !ovsdb_idl_txn_process_inc_reply(txn,
1403                                                 json_array(msg->result))) {
1404             hard_errors++;
1405         }
1406
1407         status = (hard_errors ? TXN_ERROR
1408                   : soft_errors ? TXN_TRY_AGAIN
1409                   : TXN_SUCCESS);
1410     }
1411
1412     ovsdb_idl_txn_complete(txn, status);
1413     return true;
1414 }
1415
1416 struct ovsdb_idl_txn *
1417 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
1418 {
1419     struct ovsdb_idl_txn *txn = row->table->idl->txn;
1420     assert(txn != NULL);
1421     return txn;
1422 }