ovs-vsctl: Fix performance problem.
[sliver-openvswitch.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-data.h"
29 #include "ovsdb-error.h"
30 #include "ovsdb-idl-provider.h"
31 #include "poll-loop.h"
32 #include "shash.h"
33 #include "util.h"
34
35 #define THIS_MODULE VLM_ovsdb_idl
36 #include "vlog.h"
37
38 /* An arc from one idl_row to another.  When row A contains a UUID that
39  * references row B, this is represented by an arc from A (the source) to B
40  * (the destination).
41  *
42  * Arcs from a row to itself are omitted, that is, src and dst are always
43  * different.
44  *
45  * Arcs are never duplicated, that is, even if there are multiple references
46  * from A to B, there is only a single arc from A to B.
47  *
48  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
49  * A.  Both an arc and its converse may both be present, if each row refers
50  * to the other circularly.
51  *
52  * The source and destination row may be in the same table or in different
53  * tables.
54  */
55 struct ovsdb_idl_arc {
56     struct list src_node;       /* In src->src_arcs list. */
57     struct list dst_node;       /* In dst->dst_arcs list. */
58     struct ovsdb_idl_row *src;  /* Source row. */
59     struct ovsdb_idl_row *dst;  /* Destination row. */
60 };
61
62 struct ovsdb_idl {
63     const struct ovsdb_idl_class *class;
64     struct jsonrpc_session *session;
65     struct shash table_by_name;
66     struct ovsdb_idl_table *tables;
67     struct json *monitor_request_id;
68     unsigned int last_monitor_request_seqno;
69     unsigned int change_seqno;
70
71     /* Transaction support. */
72     struct ovsdb_idl_txn *txn;
73     struct hmap outstanding_txns;
74 };
75
76 struct ovsdb_idl_txn {
77     struct hmap_node hmap_node;
78     struct json *request_id;
79     struct ovsdb_idl *idl;
80     struct hmap txn_rows;
81     enum ovsdb_idl_txn_status status;
82 };
83
84 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
85 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
86
87 static void ovsdb_idl_clear(struct ovsdb_idl *);
88 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
89 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
90 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
91                                                     const struct json *);
92 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
93                                      const struct uuid *,
94                                      const struct json *old,
95                                      const struct json *new);
96 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
97 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
98 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
99
100 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
101 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
102     const struct ovsdb_idl_table_class *);
103 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
104                                                   const struct uuid *);
105 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
106
107 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
108 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
109
110 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
111 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
112                                         const struct jsonrpc_msg *msg);
113
114 struct ovsdb_idl *
115 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
116 {
117     struct ovsdb_idl *idl;
118     size_t i;
119
120     idl = xzalloc(sizeof *idl);
121     idl->class = class;
122     idl->session = jsonrpc_session_open(remote);
123     shash_init(&idl->table_by_name);
124     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
125     for (i = 0; i < class->n_tables; i++) {
126         const struct ovsdb_idl_table_class *tc = &class->tables[i];
127         struct ovsdb_idl_table *table = &idl->tables[i];
128         size_t j;
129
130         assert(!shash_find(&idl->table_by_name, tc->name));
131         shash_add(&idl->table_by_name, tc->name, table);
132         table->class = tc;
133         shash_init(&table->columns);
134         for (j = 0; j < tc->n_columns; j++) {
135             const struct ovsdb_idl_column *column = &tc->columns[j];
136
137             assert(!shash_find(&table->columns, column->name));
138             shash_add(&table->columns, column->name, column);
139         }
140         hmap_init(&table->rows);
141         table->idl = idl;
142     }
143     idl->last_monitor_request_seqno = UINT_MAX;
144     hmap_init(&idl->outstanding_txns);
145
146     return idl;
147 }
148
149 void
150 ovsdb_idl_destroy(struct ovsdb_idl *idl)
151 {
152     if (idl) {
153         size_t i;
154
155         assert(!idl->txn);
156         ovsdb_idl_clear(idl);
157         jsonrpc_session_close(idl->session);
158
159         for (i = 0; i < idl->class->n_tables; i++) {
160             struct ovsdb_idl_table *table = &idl->tables[i];
161             shash_destroy(&table->columns);
162             hmap_destroy(&table->rows);
163         }
164         shash_destroy(&idl->table_by_name);
165         free(idl->tables);
166         json_destroy(idl->monitor_request_id);
167         free(idl);
168     }
169 }
170
171 static void
172 ovsdb_idl_clear(struct ovsdb_idl *idl)
173 {
174     bool changed = false;
175     size_t i;
176
177     for (i = 0; i < idl->class->n_tables; i++) {
178         struct ovsdb_idl_table *table = &idl->tables[i];
179         struct ovsdb_idl_row *row, *next_row;
180
181         if (hmap_is_empty(&table->rows)) {
182             continue;
183         }
184
185         changed = true;
186         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
187                             &table->rows) {
188             struct ovsdb_idl_arc *arc, *next_arc;
189
190             if (!ovsdb_idl_row_is_orphan(row)) {
191                 (row->table->class->unparse)(row);
192                 ovsdb_idl_row_clear_old(row);
193             }
194             hmap_remove(&table->rows, &row->hmap_node);
195             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
196                                 &row->src_arcs) {
197                 free(arc);
198             }
199             /* No need to do anything with dst_arcs: some node has those arcs
200              * as forward arcs and will destroy them itself. */
201
202             free(row);
203         }
204     }
205
206     if (changed) {
207         idl->change_seqno++;
208     }
209 }
210
211 void
212 ovsdb_idl_run(struct ovsdb_idl *idl)
213 {
214     int i;
215
216     assert(!idl->txn);
217     jsonrpc_session_run(idl->session);
218     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
219         struct jsonrpc_msg *msg, *reply;
220         unsigned int seqno;
221
222         seqno = jsonrpc_session_get_seqno(idl->session);
223         if (idl->last_monitor_request_seqno != seqno) {
224             idl->last_monitor_request_seqno = seqno;
225             ovsdb_idl_txn_abort_all(idl);
226             ovsdb_idl_send_monitor_request(idl);
227             break;
228         }
229
230         msg = jsonrpc_session_recv(idl->session);
231         if (!msg) {
232             break;
233         }
234
235         reply = NULL;
236         if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
237             reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
238         } else if (msg->type == JSONRPC_NOTIFY
239                    && !strcmp(msg->method, "update")
240                    && msg->params->type == JSON_ARRAY
241                    && msg->params->u.array.n == 2
242                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
243             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
244         } else if (msg->type == JSONRPC_REPLY
245                    && idl->monitor_request_id
246                    && json_equal(idl->monitor_request_id, msg->id)) {
247             json_destroy(idl->monitor_request_id);
248             idl->monitor_request_id = NULL;
249             ovsdb_idl_clear(idl);
250             ovsdb_idl_parse_update(idl, msg->result);
251         } else if (msg->type == JSONRPC_REPLY
252                    && msg->id && msg->id->type == JSON_STRING
253                    && !strcmp(msg->id->u.string, "echo")) {
254             /* It's a reply to our echo request.  Ignore it. */
255         } else if ((msg->type == JSONRPC_ERROR
256                     || msg->type == JSONRPC_REPLY)
257                    && ovsdb_idl_txn_process_reply(idl, msg)) {
258             /* ovsdb_idl_txn_process_reply() did everything needful. */
259         } else {
260             VLOG_WARN("%s: received unexpected %s message",
261                       jsonrpc_session_get_name(idl->session),
262                       jsonrpc_msg_type_to_string(msg->type));
263             jsonrpc_session_force_reconnect(idl->session);
264         }
265         if (reply) {
266             jsonrpc_session_send(idl->session, reply);
267         }
268         jsonrpc_msg_destroy(msg);
269     }
270 }
271
272 void
273 ovsdb_idl_wait(struct ovsdb_idl *idl)
274 {
275     jsonrpc_session_wait(idl->session);
276     jsonrpc_session_recv_wait(idl->session);
277 }
278
279 unsigned int
280 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
281 {
282     return idl->change_seqno;
283 }
284
285 void
286 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
287 {
288     jsonrpc_session_force_reconnect(idl->session);
289 }
290 \f
291 static void
292 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
293 {
294     struct json *monitor_requests;
295     struct jsonrpc_msg *msg;
296     size_t i;
297
298     monitor_requests = json_object_create();
299     for (i = 0; i < idl->class->n_tables; i++) {
300         const struct ovsdb_idl_table *table = &idl->tables[i];
301         const struct ovsdb_idl_table_class *tc = table->class;
302         struct json *monitor_request, *columns;
303         size_t i;
304
305         monitor_request = json_object_create();
306         columns = json_array_create_empty();
307         for (i = 0; i < tc->n_columns; i++) {
308             const struct ovsdb_idl_column *column = &tc->columns[i];
309             json_array_add(columns, json_string_create(column->name));
310         }
311         json_object_put(monitor_request, "columns", columns);
312         json_object_put(monitor_requests, tc->name, monitor_request);
313     }
314
315     json_destroy(idl->monitor_request_id);
316     msg = jsonrpc_create_request(
317         "monitor", json_array_create_2(json_null_create(), monitor_requests),
318         &idl->monitor_request_id);
319     jsonrpc_session_send(idl->session, msg);
320 }
321
322 static void
323 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
324 {
325     struct ovsdb_error *error;
326
327     idl->change_seqno++;
328
329     error = ovsdb_idl_parse_update__(idl, table_updates);
330     if (error) {
331         if (!VLOG_DROP_WARN(&syntax_rl)) {
332             char *s = ovsdb_error_to_string(error);
333             VLOG_WARN_RL(&syntax_rl, "%s", s);
334             free(s);
335         }
336         ovsdb_error_destroy(error);
337     }
338 }
339
340 static struct ovsdb_error *
341 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
342                          const struct json *table_updates)
343 {
344     const struct shash_node *tables_node;
345
346     if (table_updates->type != JSON_OBJECT) {
347         return ovsdb_syntax_error(table_updates, NULL,
348                                   "<table-updates> is not an object");
349     }
350     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
351         const struct json *table_update = tables_node->data;
352         const struct shash_node *table_node;
353         struct ovsdb_idl_table *table;
354
355         table = shash_find_data(&idl->table_by_name, tables_node->name);
356         if (!table) {
357             return ovsdb_syntax_error(
358                 table_updates, NULL,
359                 "<table-updates> includes unknown table \"%s\"",
360                 tables_node->name);
361         }
362
363         if (table_update->type != JSON_OBJECT) {
364             return ovsdb_syntax_error(table_update, NULL,
365                                       "<table-update> for table \"%s\" is "
366                                       "not an object", table->class->name);
367         }
368         SHASH_FOR_EACH (table_node, json_object(table_update)) {
369             const struct json *row_update = table_node->data;
370             const struct json *old_json, *new_json;
371             struct uuid uuid;
372
373             if (!uuid_from_string(&uuid, table_node->name)) {
374                 return ovsdb_syntax_error(table_update, NULL,
375                                           "<table-update> for table \"%s\" "
376                                           "contains bad UUID "
377                                           "\"%s\" as member name",
378                                           table->class->name,
379                                           table_node->name);
380             }
381             if (row_update->type != JSON_OBJECT) {
382                 return ovsdb_syntax_error(row_update, NULL,
383                                           "<table-update> for table \"%s\" "
384                                           "contains <row-update> for %s that "
385                                           "is not an object",
386                                           table->class->name,
387                                           table_node->name);
388             }
389
390             old_json = shash_find_data(json_object(row_update), "old");
391             new_json = shash_find_data(json_object(row_update), "new");
392             if (old_json && old_json->type != JSON_OBJECT) {
393                 return ovsdb_syntax_error(old_json, NULL,
394                                           "\"old\" <row> is not object");
395             } else if (new_json && new_json->type != JSON_OBJECT) {
396                 return ovsdb_syntax_error(new_json, NULL,
397                                           "\"new\" <row> is not object");
398             } else if ((old_json != NULL) + (new_json != NULL)
399                        != shash_count(json_object(row_update))) {
400                 return ovsdb_syntax_error(row_update, NULL,
401                                           "<row-update> contains unexpected "
402                                           "member");
403             } else if (!old_json && !new_json) {
404                 return ovsdb_syntax_error(row_update, NULL,
405                                           "<row-update> missing \"old\" "
406                                           "and \"new\" members");
407             }
408
409             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
410         }
411     }
412
413     return NULL;
414 }
415
416 static struct ovsdb_idl_row *
417 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
418 {
419     struct ovsdb_idl_row *row;
420
421     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
422                              uuid_hash(uuid), &table->rows) {
423         if (uuid_equals(&row->uuid, uuid)) {
424             return row;
425         }
426     }
427     return NULL;
428 }
429
430 static void
431 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
432                          const struct uuid *uuid, const struct json *old,
433                          const struct json *new)
434 {
435     struct ovsdb_idl_row *row;
436
437     row = ovsdb_idl_get_row(table, uuid);
438     if (!new) {
439         /* Delete row. */
440         if (row && !ovsdb_idl_row_is_orphan(row)) {
441             /* XXX perhaps we should check the 'old' values? */
442             ovsdb_idl_delete_row(row);
443         } else {
444             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
445                          "from table %s",
446                          UUID_ARGS(uuid), table->class->name);
447         }
448     } else if (!old) {
449         /* Insert row. */
450         if (!row) {
451             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
452         } else if (ovsdb_idl_row_is_orphan(row)) {
453             ovsdb_idl_insert_row(row, new);
454         } else {
455             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
456                          "table %s", UUID_ARGS(uuid), table->class->name);
457             ovsdb_idl_modify_row(row, new);
458         }
459     } else {
460         /* Modify row. */
461         if (row) {
462             /* XXX perhaps we should check the 'old' values? */
463             if (!ovsdb_idl_row_is_orphan(row)) {
464                 ovsdb_idl_modify_row(row, new);
465             } else {
466                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
467                              "referenced row "UUID_FMT" in table %s",
468                              UUID_ARGS(uuid), table->class->name);
469                 ovsdb_idl_insert_row(row, new);
470             }
471         } else {
472             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
473                          "in table %s", UUID_ARGS(uuid), table->class->name);
474             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
475         }
476     }
477 }
478
479 static void
480 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
481 {
482     struct ovsdb_idl_table *table = row->table;
483     struct shash_node *node;
484
485     SHASH_FOR_EACH (node, json_object(row_json)) {
486         const char *column_name = node->name;
487         const struct ovsdb_idl_column *column;
488         struct ovsdb_datum datum;
489         struct ovsdb_error *error;
490
491         column = shash_find_data(&table->columns, column_name);
492         if (!column) {
493             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
494                          column_name, UUID_ARGS(&row->uuid));
495             continue;
496         }
497
498         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
499         if (!error) {
500             ovsdb_datum_swap(&row->old[column - table->class->columns],
501                              &datum);
502             ovsdb_datum_destroy(&datum, &column->type);
503         } else {
504             char *s = ovsdb_error_to_string(error);
505             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
506                          " in table %s: %s", column_name,
507                          UUID_ARGS(&row->uuid), table->class->name, s);
508             free(s);
509             ovsdb_error_destroy(error);
510         }
511     }
512 }
513
514 static bool
515 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
516 {
517     return !row->old;
518 }
519
520 static void
521 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
522 {
523     assert(row->old == row->new);
524     if (!ovsdb_idl_row_is_orphan(row)) {
525         const struct ovsdb_idl_table_class *class = row->table->class;
526         size_t i;
527
528         for (i = 0; i < class->n_columns; i++) {
529             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
530         }
531         free(row->old);
532         row->old = row->new = NULL;
533     }
534 }
535
536 static void
537 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
538 {
539     if (row->old != row->new) {
540         if (row->new) {
541             const struct ovsdb_idl_table_class *class = row->table->class;
542             size_t i;
543
544             BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
545                 ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
546             }
547             free(row->new);
548             free(row->written);
549             row->written = NULL;
550         }
551         row->new = row->old;
552     }
553 }
554
555 static void
556 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
557 {
558     struct ovsdb_idl_arc *arc, *next;
559
560     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
561      * that this causes to be unreferenced. */
562     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
563                         &row->src_arcs) {
564         list_remove(&arc->dst_node);
565         if (destroy_dsts
566             && ovsdb_idl_row_is_orphan(arc->dst)
567             && list_is_empty(&arc->dst->dst_arcs)) {
568             ovsdb_idl_row_destroy(arc->dst);
569         }
570         free(arc);
571     }
572     list_init(&row->src_arcs);
573 }
574
575 /* Force nodes that reference 'row' to reparse. */
576 static void
577 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row, bool destroy_dsts)
578 {
579     struct ovsdb_idl_arc *arc, *next;
580
581     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
582      * 'arc', so we need to use the "safe" variant of list traversal.  However,
583      * calling ref->table->class->parse will add an arc equivalent to 'arc' to
584      * row->arcs.  That could be a problem for traversal, but it adds it at the
585      * beginning of the list to prevent us from stumbling upon it again.
586      *
587      * (If duplicate arcs were possible then we would need to make sure that
588      * 'next' didn't also point into 'arc''s destination, but we forbid
589      * duplicate arcs.) */
590     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
591                         &row->dst_arcs) {
592         struct ovsdb_idl_row *ref = arc->src;
593
594         (ref->table->class->unparse)(ref);
595         ovsdb_idl_row_clear_arcs(ref, destroy_dsts);
596         (ref->table->class->parse)(ref);
597     }
598 }
599
600 static struct ovsdb_idl_row *
601 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
602 {
603     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
604     memset(row, 0, sizeof *row);
605     list_init(&row->src_arcs);
606     list_init(&row->dst_arcs);
607     hmap_node_nullify(&row->txn_node);
608     return row;
609 }
610
611 static struct ovsdb_idl_row *
612 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
613 {
614     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
615     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
616     row->uuid = *uuid;
617     row->table = table;
618     return row;
619 }
620
621 static void
622 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
623 {
624     if (row) {
625         ovsdb_idl_row_clear_old(row);
626         hmap_remove(&row->table->rows, &row->hmap_node);
627         free(row);
628     }
629 }
630
631 static void
632 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
633 {
634     const struct ovsdb_idl_table_class *class = row->table->class;
635     size_t i;
636
637     assert(!row->old && !row->new);
638     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
639     for (i = 0; i < class->n_columns; i++) {
640         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
641     }
642     ovsdb_idl_row_update(row, row_json);
643     (class->parse)(row);
644
645     ovsdb_idl_row_reparse_backrefs(row, false);
646 }
647
648 static void
649 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
650 {
651     (row->table->class->unparse)(row);
652     ovsdb_idl_row_clear_arcs(row, true);
653     ovsdb_idl_row_clear_old(row);
654     if (list_is_empty(&row->dst_arcs)) {
655         ovsdb_idl_row_destroy(row);
656     } else {
657         ovsdb_idl_row_reparse_backrefs(row, true);
658     }
659 }
660
661 static void
662 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
663 {
664     (row->table->class->unparse)(row);
665     ovsdb_idl_row_clear_arcs(row, true);
666     ovsdb_idl_row_update(row, row_json);
667     (row->table->class->parse)(row);
668 }
669
670 static bool
671 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
672 {
673     const struct ovsdb_idl_arc *arc;
674
675     /* No self-arcs. */
676     if (src == dst) {
677         return false;
678     }
679
680     /* No duplicate arcs.
681      *
682      * We only need to test whether the first arc in dst->dst_arcs originates
683      * at 'src', since we add all of the arcs from a given source in a clump
684      * (in a single call to a row's ->parse function) and new arcs are always
685      * added at the front of the dst_arcs list. */
686     if (list_is_empty(&dst->dst_arcs)) {
687         return true;
688     }
689     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
690     return arc->src != src;
691 }
692
693 static struct ovsdb_idl_table *
694 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
695                            const struct ovsdb_idl_table_class *table_class)
696 {
697     return &idl->tables[table_class - idl->class->tables];
698 }
699
700 struct ovsdb_idl_row *
701 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
702                       struct ovsdb_idl_table_class *dst_table_class,
703                       const struct uuid *dst_uuid)
704 {
705     struct ovsdb_idl *idl = src->table->idl;
706     struct ovsdb_idl_table *dst_table;
707     struct ovsdb_idl_arc *arc;
708     struct ovsdb_idl_row *dst;
709
710     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
711     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
712     if (!dst) {
713         dst = ovsdb_idl_row_create(dst_table, dst_uuid);
714     }
715
716     /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
717     if (may_add_arc(src, dst)) {
718         /* The arc *must* be added at the front of the dst_arcs list.  See
719          * ovsdb_idl_row_reparse_backrefs() for details. */
720         arc = xmalloc(sizeof *arc);
721         list_push_front(&src->src_arcs, &arc->src_node);
722         list_push_front(&dst->dst_arcs, &arc->dst_node);
723         arc->src = src;
724         arc->dst = dst;
725     }
726
727     return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
728 }
729
730 static struct ovsdb_idl_row *
731 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
732 {
733     for (; node; node = hmap_next(&table->rows, node)) {
734         struct ovsdb_idl_row *row;
735
736         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
737         if (!ovsdb_idl_row_is_orphan(row)) {
738             return row;
739         }
740     }
741     return NULL;
742 }
743
744 struct ovsdb_idl_row *
745 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
746                     const struct ovsdb_idl_table_class *table_class)
747 {
748     struct ovsdb_idl_table *table
749         = ovsdb_idl_table_from_class(idl, table_class);
750     return next_real_row(table, hmap_first(&table->rows));
751 }
752
753 struct ovsdb_idl_row *
754 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
755 {
756     struct ovsdb_idl_table *table = row->table;
757
758     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
759 }
760 \f
761 /* Transactions. */
762
763 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
764                                    enum ovsdb_idl_txn_status);
765
766 const char *
767 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
768 {
769     switch (status) {
770     case TXN_INCOMPLETE:
771         return "incomplete";
772     case TXN_ABORTED:
773         return "aborted";
774     case TXN_SUCCESS:
775         return "success";
776     case TXN_TRY_AGAIN:
777         return "try again";
778     case TXN_ERROR:
779         return "error";
780     }
781     return "<unknown>";
782 }
783
784 struct ovsdb_idl_txn *
785 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
786 {
787     struct ovsdb_idl_txn *txn;
788
789     assert(!idl->txn);
790     idl->txn = txn = xmalloc(sizeof *txn);
791     txn->idl = idl;
792     txn->status = TXN_INCOMPLETE;
793     hmap_init(&txn->txn_rows);
794     return txn;
795 }
796
797 void
798 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
799 {
800     ovsdb_idl_txn_abort(txn);
801     free(txn);
802 }
803
804 void
805 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
806 {
807     if (txn->status != TXN_INCOMPLETE) {
808         poll_immediate_wake();
809     }
810 }
811
812 static struct json *
813 where_uuid_equals(const struct uuid *uuid)
814 {
815     return
816         json_array_create_1(
817             json_array_create_3(
818                 json_string_create("_uuid"),
819                 json_string_create("=="),
820                 json_array_create_2(
821                     json_string_create("uuid"),
822                     json_string_create_nocopy(
823                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
824 }
825
826 static char *
827 uuid_name_from_uuid(const struct uuid *uuid)
828 {
829     char *name;
830     char *p;
831
832     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
833     for (p = name; *p != '\0'; p++) {
834         if (*p == '-') {
835             *p = '_';
836         }
837     }
838
839     return name;
840 }
841
842 static const struct ovsdb_idl_row *
843 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
844 {
845     const struct ovsdb_idl_row *row;
846
847     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, txn_node,
848                              uuid_hash(uuid), &txn->txn_rows) {
849         if (uuid_equals(&row->uuid, uuid)) {
850             return row;
851         }
852     }
853     return NULL;
854 }
855
856 /* XXX there must be a cleaner way to do this */
857 static struct json *
858 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
859 {
860     if (json->type == JSON_ARRAY) {
861         struct uuid uuid;
862         size_t i;
863
864         if (json->u.array.n == 2
865             && json->u.array.elems[0]->type == JSON_STRING
866             && json->u.array.elems[1]->type == JSON_STRING
867             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
868             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
869             const struct ovsdb_idl_row *row;
870
871             row = ovsdb_idl_txn_get_row(txn, &uuid);
872             if (row && !row->old && row->new) {
873                 json_destroy(json);
874
875                 return json_array_create_2(
876                     json_string_create("named-uuid"),
877                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
878             }
879         }
880
881         for (i = 0; i < json->u.array.n; i++) {
882             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
883                                                       txn);
884         }
885     } else if (json->type == JSON_OBJECT) {
886         struct shash_node *node;
887
888         SHASH_FOR_EACH (node, json_object(json)) {
889             node->data = substitute_uuids(node->data, txn);
890         }
891     }
892     return json;
893 }
894
895 static void
896 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
897 {
898     struct ovsdb_idl_row *row, *next;
899
900     HMAP_FOR_EACH_SAFE (row, next, struct ovsdb_idl_row, txn_node,
901                         &txn->txn_rows) {
902         if (row->old && row->written) {
903             (row->table->class->unparse)(row);
904             ovsdb_idl_row_clear_arcs(row, false);
905             (row->table->class->parse)(row);
906         }
907         ovsdb_idl_row_clear_new(row);
908
909         free(row->prereqs);
910         row->prereqs = NULL;
911
912         free(row->written);
913         row->written = NULL;
914
915         hmap_remove(&txn->txn_rows, &row->txn_node);
916         hmap_node_nullify(&row->txn_node);
917     }
918     hmap_destroy(&txn->txn_rows);
919     hmap_init(&txn->txn_rows);
920 }
921
922 enum ovsdb_idl_txn_status
923 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
924 {
925     struct ovsdb_idl_row *row;
926     struct json *operations;
927     bool any_updates;
928
929     if (txn != txn->idl->txn) {
930         return txn->status;
931     }
932
933     operations = json_array_create_empty();
934
935     /* Add prerequisites and declarations of new rows. */
936     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
937         /* XXX check that deleted rows exist even if no prereqs? */
938         if (row->prereqs) {
939             const struct ovsdb_idl_table_class *class = row->table->class;
940             size_t n_columns = class->n_columns;
941             struct json *op, *columns, *row_json;
942             size_t idx;
943
944             op = json_object_create();
945             json_array_add(operations, op);
946             json_object_put_string(op, "op", "wait");
947             json_object_put_string(op, "table", class->name);
948             json_object_put(op, "timeout", json_integer_create(0));
949             json_object_put(op, "where", where_uuid_equals(&row->uuid));
950             json_object_put_string(op, "until", "==");
951             columns = json_array_create_empty();
952             json_object_put(op, "columns", columns);
953             row_json = json_object_create();
954             json_object_put(op, "rows", json_array_create_1(row_json));
955
956             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
957                 const struct ovsdb_idl_column *column = &class->columns[idx];
958                 json_array_add(columns, json_string_create(column->name));
959                 json_object_put(row_json, column->name,
960                                 ovsdb_datum_to_json(&row->old[idx],
961                                                     &column->type));
962             }
963         }
964         if (row->new && !row->old) {
965             struct json *op;
966
967             op = json_object_create();
968             json_array_add(operations, op);
969             json_object_put_string(op, "op", "declare");
970             json_object_put(op, "uuid-name",
971                             json_string_create_nocopy(
972                                 uuid_name_from_uuid(&row->uuid)));
973         }
974     }
975
976     /* Add updates. */
977     any_updates = false;
978     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
979         const struct ovsdb_idl_table_class *class = row->table->class;
980         size_t n_columns = class->n_columns;
981         struct json *row_json;
982         size_t idx;
983
984         if (row->old == row->new) {
985             continue;
986         } else if (!row->new) {
987             struct json *op = json_object_create();
988             json_object_put_string(op, "op", "delete");
989             json_object_put_string(op, "table", class->name);
990             json_object_put(op, "where", where_uuid_equals(&row->uuid));
991             json_array_add(operations, op);
992         } else {
993             row_json = NULL;
994             BITMAP_FOR_EACH_1 (idx, n_columns, row->written) {
995                 const struct ovsdb_idl_column *column = &class->columns[idx];
996
997                 if (row->old
998                     && ovsdb_datum_equals(&row->old[idx], &row->new[idx],
999                                           &column->type)) {
1000                     continue;
1001                 }
1002                 if (!row_json) {
1003                     struct json *op = json_object_create();
1004                     json_array_add(operations, op);
1005                     json_object_put_string(op, "op",
1006                                            row->old ? "update" : "insert");
1007                     json_object_put_string(op, "table", class->name);
1008                     if (row->old) {
1009                         json_object_put(op, "where",
1010                                         where_uuid_equals(&row->uuid));
1011                     } else {
1012                         json_object_put(op, "uuid-name",
1013                                         json_string_create_nocopy(
1014                                             uuid_name_from_uuid(&row->uuid)));
1015                     }
1016                     row_json = json_object_create();
1017                     json_object_put(op, "row", row_json);
1018                     any_updates = true;
1019                 }
1020                 json_object_put(row_json, column->name,
1021                                 substitute_uuids(
1022                                     ovsdb_datum_to_json(&row->new[idx],
1023                                                         &column->type),
1024                                     txn));
1025             }
1026         }
1027     }
1028
1029     if (!any_updates) {
1030         txn->status = TXN_SUCCESS;
1031     } else if (!jsonrpc_session_send(
1032                    txn->idl->session,
1033                    jsonrpc_create_request(
1034                        "transact", operations, &txn->request_id))) {
1035         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1036                     json_hash(txn->request_id, 0));
1037     } else {
1038         txn->status = TXN_INCOMPLETE;
1039     }
1040
1041     txn->idl->txn = NULL;
1042     ovsdb_idl_txn_disassemble(txn);
1043     return txn->status;
1044 }
1045
1046 void
1047 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1048 {
1049     ovsdb_idl_txn_disassemble(txn);
1050     if (txn->status == TXN_INCOMPLETE) {
1051         txn->status = TXN_ABORTED;
1052     }
1053 }
1054
1055 static void
1056 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1057                        enum ovsdb_idl_txn_status status)
1058 {
1059     txn->status = status;
1060     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1061 }
1062
1063 void
1064 ovsdb_idl_txn_write(struct ovsdb_idl_row *row,
1065                     const struct ovsdb_idl_column *column,
1066                     struct ovsdb_datum *datum)
1067 {
1068     const struct ovsdb_idl_table_class *class = row->table->class;
1069     size_t column_idx = column - class->columns;
1070
1071     assert(row->new);
1072     if (hmap_node_is_null(&row->txn_node)) {
1073         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1074                     uuid_hash(&row->uuid));
1075     }
1076     if (row->old == row->new) {
1077         row->new = xmalloc(class->n_columns * sizeof *row->new);
1078     }
1079     if (!row->written) {
1080         row->written = bitmap_allocate(class->n_columns);
1081     }
1082     if (bitmap_is_set(row->written, column_idx)) {
1083         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1084     } else {
1085         bitmap_set1(row->written, column_idx);
1086     }
1087     row->new[column_idx] = *datum;
1088 }
1089
1090 void
1091 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1092                      const struct ovsdb_idl_column *column)
1093 {
1094     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1095     const struct ovsdb_idl_table_class *class = row->table->class;
1096     size_t column_idx = column - class->columns;
1097
1098     assert(row->new);
1099     if (!row->old
1100         || (row->written && bitmap_is_set(row->written, column_idx))) {
1101         return;
1102     }
1103
1104     if (hmap_node_is_null(&row->txn_node)) {
1105         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1106                     uuid_hash(&row->uuid));
1107     }
1108     if (!row->prereqs) {
1109         row->prereqs = bitmap_allocate(class->n_columns);
1110     }
1111     bitmap_set1(row->prereqs, column_idx);
1112 }
1113
1114 void
1115 ovsdb_idl_txn_delete(struct ovsdb_idl_row *row)
1116 {
1117     assert(row->new);
1118     if (!row->old) {
1119         ovsdb_idl_row_clear_new(row);
1120         assert(!row->prereqs);
1121         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
1122         free(row);
1123     }
1124     if (hmap_node_is_null(&row->txn_node)) {
1125         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1126                     uuid_hash(&row->uuid));
1127     }
1128     if (row->new == row->old) {
1129         row->new = NULL;
1130     } else {
1131         ovsdb_idl_row_clear_new(row);
1132     }
1133 }
1134
1135 struct ovsdb_idl_row *
1136 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
1137                      const struct ovsdb_idl_table_class *class)
1138 {
1139     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
1140     uuid_generate(&row->uuid);
1141     row->table = ovsdb_idl_table_from_class(txn->idl, class);
1142     row->new = xmalloc(class->n_columns * sizeof *row->new);
1143     row->written = bitmap_allocate(class->n_columns);
1144     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
1145     return row;
1146 }
1147
1148 static void
1149 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
1150 {
1151     struct ovsdb_idl_txn *txn;
1152
1153     HMAP_FOR_EACH (txn, struct ovsdb_idl_txn, hmap_node,
1154                    &idl->outstanding_txns) {
1155         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
1156     }
1157 }
1158
1159 static struct ovsdb_idl_txn *
1160 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
1161 {
1162     struct ovsdb_idl_txn *txn;
1163
1164     HMAP_FOR_EACH_WITH_HASH (txn, struct ovsdb_idl_txn, hmap_node,
1165                              json_hash(id, 0), &idl->outstanding_txns) {
1166         if (json_equal(id, txn->request_id)) {
1167             return txn;
1168         }
1169     }
1170     return NULL;
1171 }
1172
1173 static bool
1174 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
1175                             const struct jsonrpc_msg *msg)
1176 {
1177     struct ovsdb_idl_txn *txn;
1178     enum ovsdb_idl_txn_status status;
1179
1180     txn = ovsdb_idl_txn_find(idl, msg->id);
1181     if (!txn) {
1182         return false;
1183     }
1184
1185     if (msg->type == JSONRPC_ERROR) {
1186         status = TXN_ERROR;
1187     } else if (msg->result->type != JSON_ARRAY) {
1188         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
1189         status = TXN_ERROR;
1190     } else {
1191         int hard_errors = 0;
1192         int soft_errors = 0;
1193         size_t i;
1194
1195         for (i = 0; i < msg->result->u.array.n; i++) {
1196             struct json *json = msg->result->u.array.elems[i];
1197
1198             if (json->type == JSON_NULL) {
1199                 /* This isn't an error in itself but indicates that some prior
1200                  * operation failed, so make sure that we know about it. */
1201                 soft_errors++;
1202             } else if (json->type == JSON_OBJECT) {
1203                 struct json *error;
1204
1205                 error = shash_find_data(json_object(json), "error");
1206                 if (error) {
1207                     if (error->type == JSON_STRING) {
1208                         if (!strcmp(error->u.string, "timed out")) {
1209                             soft_errors++;
1210                         } else {
1211                             hard_errors++;
1212                         }
1213                     } else {
1214                         hard_errors++;
1215                         VLOG_WARN_RL(&syntax_rl,
1216                                      "\"error\" in reply is not JSON string");
1217                     }
1218                 }
1219             } else {
1220                 hard_errors++;
1221                 VLOG_WARN_RL(&syntax_rl,
1222                              "operation reply is not JSON null or object");
1223             }
1224         }
1225
1226         status = (hard_errors ? TXN_ERROR
1227                   : soft_errors ? TXN_TRY_AGAIN
1228                   : TXN_SUCCESS);
1229     }
1230
1231     ovsdb_idl_txn_complete(txn, status);
1232     return true;
1233 }
1234
1235 struct ovsdb_idl_txn *
1236 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
1237 {
1238     struct ovsdb_idl_txn *txn = row->table->idl->txn;
1239     assert(txn != NULL);
1240     return txn;
1241 }