ovsdb: Add "comment" feature to transactions and make ovs-vsctl use them.
[sliver-openvswitch.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "dynamic-string.h"
27 #include "json.h"
28 #include "jsonrpc.h"
29 #include "ovsdb-data.h"
30 #include "ovsdb-error.h"
31 #include "ovsdb-idl-provider.h"
32 #include "poll-loop.h"
33 #include "shash.h"
34 #include "util.h"
35
36 #define THIS_MODULE VLM_ovsdb_idl
37 #include "vlog.h"
38
39 /* An arc from one idl_row to another.  When row A contains a UUID that
40  * references row B, this is represented by an arc from A (the source) to B
41  * (the destination).
42  *
43  * Arcs from a row to itself are omitted, that is, src and dst are always
44  * different.
45  *
46  * Arcs are never duplicated, that is, even if there are multiple references
47  * from A to B, there is only a single arc from A to B.
48  *
49  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
50  * A.  Both an arc and its converse may both be present, if each row refers
51  * to the other circularly.
52  *
53  * The source and destination row may be in the same table or in different
54  * tables.
55  */
56 struct ovsdb_idl_arc {
57     struct list src_node;       /* In src->src_arcs list. */
58     struct list dst_node;       /* In dst->dst_arcs list. */
59     struct ovsdb_idl_row *src;  /* Source row. */
60     struct ovsdb_idl_row *dst;  /* Destination row. */
61 };
62
63 struct ovsdb_idl {
64     const struct ovsdb_idl_class *class;
65     struct jsonrpc_session *session;
66     struct shash table_by_name;
67     struct ovsdb_idl_table *tables;
68     struct json *monitor_request_id;
69     unsigned int last_monitor_request_seqno;
70     unsigned int change_seqno;
71
72     /* Transaction support. */
73     struct ovsdb_idl_txn *txn;
74     struct hmap outstanding_txns;
75 };
76
77 struct ovsdb_idl_txn {
78     struct hmap_node hmap_node;
79     struct json *request_id;
80     struct ovsdb_idl *idl;
81     struct hmap txn_rows;
82     enum ovsdb_idl_txn_status status;
83     bool dry_run;
84     struct ds comment;
85 };
86
87 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
88 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
89
90 static void ovsdb_idl_clear(struct ovsdb_idl *);
91 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
92 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
93 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
94                                                     const struct json *);
95 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
96                                      const struct uuid *,
97                                      const struct json *old,
98                                      const struct json *new);
99 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
100 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
101 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
102
103 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
104 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
105     const struct ovsdb_idl_table_class *);
106 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
107                                                   const struct uuid *);
108 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
109
110 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
111 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
112
113 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
114 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
115                                         const struct jsonrpc_msg *msg);
116
117 struct ovsdb_idl *
118 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
119 {
120     struct ovsdb_idl *idl;
121     size_t i;
122
123     idl = xzalloc(sizeof *idl);
124     idl->class = class;
125     idl->session = jsonrpc_session_open(remote);
126     shash_init(&idl->table_by_name);
127     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
128     for (i = 0; i < class->n_tables; i++) {
129         const struct ovsdb_idl_table_class *tc = &class->tables[i];
130         struct ovsdb_idl_table *table = &idl->tables[i];
131         size_t j;
132
133         assert(!shash_find(&idl->table_by_name, tc->name));
134         shash_add(&idl->table_by_name, tc->name, table);
135         table->class = tc;
136         shash_init(&table->columns);
137         for (j = 0; j < tc->n_columns; j++) {
138             const struct ovsdb_idl_column *column = &tc->columns[j];
139
140             assert(!shash_find(&table->columns, column->name));
141             shash_add(&table->columns, column->name, column);
142         }
143         hmap_init(&table->rows);
144         table->idl = idl;
145     }
146     idl->last_monitor_request_seqno = UINT_MAX;
147     hmap_init(&idl->outstanding_txns);
148
149     return idl;
150 }
151
152 void
153 ovsdb_idl_destroy(struct ovsdb_idl *idl)
154 {
155     if (idl) {
156         size_t i;
157
158         assert(!idl->txn);
159         ovsdb_idl_clear(idl);
160         jsonrpc_session_close(idl->session);
161
162         for (i = 0; i < idl->class->n_tables; i++) {
163             struct ovsdb_idl_table *table = &idl->tables[i];
164             shash_destroy(&table->columns);
165             hmap_destroy(&table->rows);
166         }
167         shash_destroy(&idl->table_by_name);
168         free(idl->tables);
169         json_destroy(idl->monitor_request_id);
170         free(idl);
171     }
172 }
173
174 static void
175 ovsdb_idl_clear(struct ovsdb_idl *idl)
176 {
177     bool changed = false;
178     size_t i;
179
180     for (i = 0; i < idl->class->n_tables; i++) {
181         struct ovsdb_idl_table *table = &idl->tables[i];
182         struct ovsdb_idl_row *row, *next_row;
183
184         if (hmap_is_empty(&table->rows)) {
185             continue;
186         }
187
188         changed = true;
189         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
190                             &table->rows) {
191             struct ovsdb_idl_arc *arc, *next_arc;
192
193             if (!ovsdb_idl_row_is_orphan(row)) {
194                 (row->table->class->unparse)(row);
195                 ovsdb_idl_row_clear_old(row);
196             }
197             hmap_remove(&table->rows, &row->hmap_node);
198             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
199                                 &row->src_arcs) {
200                 free(arc);
201             }
202             /* No need to do anything with dst_arcs: some node has those arcs
203              * as forward arcs and will destroy them itself. */
204
205             free(row);
206         }
207     }
208
209     if (changed) {
210         idl->change_seqno++;
211     }
212 }
213
214 void
215 ovsdb_idl_run(struct ovsdb_idl *idl)
216 {
217     int i;
218
219     assert(!idl->txn);
220     jsonrpc_session_run(idl->session);
221     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
222         struct jsonrpc_msg *msg, *reply;
223         unsigned int seqno;
224
225         seqno = jsonrpc_session_get_seqno(idl->session);
226         if (idl->last_monitor_request_seqno != seqno) {
227             idl->last_monitor_request_seqno = seqno;
228             ovsdb_idl_txn_abort_all(idl);
229             ovsdb_idl_send_monitor_request(idl);
230             break;
231         }
232
233         msg = jsonrpc_session_recv(idl->session);
234         if (!msg) {
235             break;
236         }
237
238         reply = NULL;
239         if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
240             reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
241         } else if (msg->type == JSONRPC_NOTIFY
242                    && !strcmp(msg->method, "update")
243                    && msg->params->type == JSON_ARRAY
244                    && msg->params->u.array.n == 2
245                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
246             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
247         } else if (msg->type == JSONRPC_REPLY
248                    && idl->monitor_request_id
249                    && json_equal(idl->monitor_request_id, msg->id)) {
250             json_destroy(idl->monitor_request_id);
251             idl->monitor_request_id = NULL;
252             ovsdb_idl_clear(idl);
253             ovsdb_idl_parse_update(idl, msg->result);
254         } else if (msg->type == JSONRPC_REPLY
255                    && msg->id && msg->id->type == JSON_STRING
256                    && !strcmp(msg->id->u.string, "echo")) {
257             /* It's a reply to our echo request.  Ignore it. */
258         } else if ((msg->type == JSONRPC_ERROR
259                     || msg->type == JSONRPC_REPLY)
260                    && ovsdb_idl_txn_process_reply(idl, msg)) {
261             /* ovsdb_idl_txn_process_reply() did everything needful. */
262         } else {
263             VLOG_WARN("%s: received unexpected %s message",
264                       jsonrpc_session_get_name(idl->session),
265                       jsonrpc_msg_type_to_string(msg->type));
266             jsonrpc_session_force_reconnect(idl->session);
267         }
268         if (reply) {
269             jsonrpc_session_send(idl->session, reply);
270         }
271         jsonrpc_msg_destroy(msg);
272     }
273 }
274
275 void
276 ovsdb_idl_wait(struct ovsdb_idl *idl)
277 {
278     jsonrpc_session_wait(idl->session);
279     jsonrpc_session_recv_wait(idl->session);
280 }
281
282 unsigned int
283 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
284 {
285     return idl->change_seqno;
286 }
287
288 void
289 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
290 {
291     jsonrpc_session_force_reconnect(idl->session);
292 }
293 \f
294 static void
295 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
296 {
297     struct json *monitor_requests;
298     struct jsonrpc_msg *msg;
299     size_t i;
300
301     monitor_requests = json_object_create();
302     for (i = 0; i < idl->class->n_tables; i++) {
303         const struct ovsdb_idl_table *table = &idl->tables[i];
304         const struct ovsdb_idl_table_class *tc = table->class;
305         struct json *monitor_request, *columns;
306         size_t i;
307
308         monitor_request = json_object_create();
309         columns = json_array_create_empty();
310         for (i = 0; i < tc->n_columns; i++) {
311             const struct ovsdb_idl_column *column = &tc->columns[i];
312             json_array_add(columns, json_string_create(column->name));
313         }
314         json_object_put(monitor_request, "columns", columns);
315         json_object_put(monitor_requests, tc->name, monitor_request);
316     }
317
318     json_destroy(idl->monitor_request_id);
319     msg = jsonrpc_create_request(
320         "monitor", json_array_create_2(json_null_create(), monitor_requests),
321         &idl->monitor_request_id);
322     jsonrpc_session_send(idl->session, msg);
323 }
324
325 static void
326 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
327 {
328     struct ovsdb_error *error;
329
330     idl->change_seqno++;
331
332     error = ovsdb_idl_parse_update__(idl, table_updates);
333     if (error) {
334         if (!VLOG_DROP_WARN(&syntax_rl)) {
335             char *s = ovsdb_error_to_string(error);
336             VLOG_WARN_RL(&syntax_rl, "%s", s);
337             free(s);
338         }
339         ovsdb_error_destroy(error);
340     }
341 }
342
343 static struct ovsdb_error *
344 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
345                          const struct json *table_updates)
346 {
347     const struct shash_node *tables_node;
348
349     if (table_updates->type != JSON_OBJECT) {
350         return ovsdb_syntax_error(table_updates, NULL,
351                                   "<table-updates> is not an object");
352     }
353     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
354         const struct json *table_update = tables_node->data;
355         const struct shash_node *table_node;
356         struct ovsdb_idl_table *table;
357
358         table = shash_find_data(&idl->table_by_name, tables_node->name);
359         if (!table) {
360             return ovsdb_syntax_error(
361                 table_updates, NULL,
362                 "<table-updates> includes unknown table \"%s\"",
363                 tables_node->name);
364         }
365
366         if (table_update->type != JSON_OBJECT) {
367             return ovsdb_syntax_error(table_update, NULL,
368                                       "<table-update> for table \"%s\" is "
369                                       "not an object", table->class->name);
370         }
371         SHASH_FOR_EACH (table_node, json_object(table_update)) {
372             const struct json *row_update = table_node->data;
373             const struct json *old_json, *new_json;
374             struct uuid uuid;
375
376             if (!uuid_from_string(&uuid, table_node->name)) {
377                 return ovsdb_syntax_error(table_update, NULL,
378                                           "<table-update> for table \"%s\" "
379                                           "contains bad UUID "
380                                           "\"%s\" as member name",
381                                           table->class->name,
382                                           table_node->name);
383             }
384             if (row_update->type != JSON_OBJECT) {
385                 return ovsdb_syntax_error(row_update, NULL,
386                                           "<table-update> for table \"%s\" "
387                                           "contains <row-update> for %s that "
388                                           "is not an object",
389                                           table->class->name,
390                                           table_node->name);
391             }
392
393             old_json = shash_find_data(json_object(row_update), "old");
394             new_json = shash_find_data(json_object(row_update), "new");
395             if (old_json && old_json->type != JSON_OBJECT) {
396                 return ovsdb_syntax_error(old_json, NULL,
397                                           "\"old\" <row> is not object");
398             } else if (new_json && new_json->type != JSON_OBJECT) {
399                 return ovsdb_syntax_error(new_json, NULL,
400                                           "\"new\" <row> is not object");
401             } else if ((old_json != NULL) + (new_json != NULL)
402                        != shash_count(json_object(row_update))) {
403                 return ovsdb_syntax_error(row_update, NULL,
404                                           "<row-update> contains unexpected "
405                                           "member");
406             } else if (!old_json && !new_json) {
407                 return ovsdb_syntax_error(row_update, NULL,
408                                           "<row-update> missing \"old\" "
409                                           "and \"new\" members");
410             }
411
412             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
413         }
414     }
415
416     return NULL;
417 }
418
419 static struct ovsdb_idl_row *
420 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
421 {
422     struct ovsdb_idl_row *row;
423
424     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
425                              uuid_hash(uuid), &table->rows) {
426         if (uuid_equals(&row->uuid, uuid)) {
427             return row;
428         }
429     }
430     return NULL;
431 }
432
433 static void
434 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
435                          const struct uuid *uuid, const struct json *old,
436                          const struct json *new)
437 {
438     struct ovsdb_idl_row *row;
439
440     row = ovsdb_idl_get_row(table, uuid);
441     if (!new) {
442         /* Delete row. */
443         if (row && !ovsdb_idl_row_is_orphan(row)) {
444             /* XXX perhaps we should check the 'old' values? */
445             ovsdb_idl_delete_row(row);
446         } else {
447             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
448                          "from table %s",
449                          UUID_ARGS(uuid), table->class->name);
450         }
451     } else if (!old) {
452         /* Insert row. */
453         if (!row) {
454             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
455         } else if (ovsdb_idl_row_is_orphan(row)) {
456             ovsdb_idl_insert_row(row, new);
457         } else {
458             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
459                          "table %s", UUID_ARGS(uuid), table->class->name);
460             ovsdb_idl_modify_row(row, new);
461         }
462     } else {
463         /* Modify row. */
464         if (row) {
465             /* XXX perhaps we should check the 'old' values? */
466             if (!ovsdb_idl_row_is_orphan(row)) {
467                 ovsdb_idl_modify_row(row, new);
468             } else {
469                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
470                              "referenced row "UUID_FMT" in table %s",
471                              UUID_ARGS(uuid), table->class->name);
472                 ovsdb_idl_insert_row(row, new);
473             }
474         } else {
475             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
476                          "in table %s", UUID_ARGS(uuid), table->class->name);
477             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
478         }
479     }
480 }
481
482 static void
483 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
484 {
485     struct ovsdb_idl_table *table = row->table;
486     struct shash_node *node;
487
488     SHASH_FOR_EACH (node, json_object(row_json)) {
489         const char *column_name = node->name;
490         const struct ovsdb_idl_column *column;
491         struct ovsdb_datum datum;
492         struct ovsdb_error *error;
493
494         column = shash_find_data(&table->columns, column_name);
495         if (!column) {
496             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
497                          column_name, UUID_ARGS(&row->uuid));
498             continue;
499         }
500
501         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
502         if (!error) {
503             ovsdb_datum_swap(&row->old[column - table->class->columns],
504                              &datum);
505             ovsdb_datum_destroy(&datum, &column->type);
506         } else {
507             char *s = ovsdb_error_to_string(error);
508             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
509                          " in table %s: %s", column_name,
510                          UUID_ARGS(&row->uuid), table->class->name, s);
511             free(s);
512             ovsdb_error_destroy(error);
513         }
514     }
515 }
516
517 static bool
518 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
519 {
520     return !row->old;
521 }
522
523 static void
524 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
525 {
526     assert(row->old == row->new);
527     if (!ovsdb_idl_row_is_orphan(row)) {
528         const struct ovsdb_idl_table_class *class = row->table->class;
529         size_t i;
530
531         for (i = 0; i < class->n_columns; i++) {
532             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
533         }
534         free(row->old);
535         row->old = row->new = NULL;
536     }
537 }
538
539 static void
540 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
541 {
542     if (row->old != row->new) {
543         if (row->new) {
544             const struct ovsdb_idl_table_class *class = row->table->class;
545             size_t i;
546
547             BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
548                 ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
549             }
550             free(row->new);
551             free(row->written);
552             row->written = NULL;
553         }
554         row->new = row->old;
555     }
556 }
557
558 static void
559 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
560 {
561     struct ovsdb_idl_arc *arc, *next;
562
563     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
564      * that this causes to be unreferenced. */
565     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
566                         &row->src_arcs) {
567         list_remove(&arc->dst_node);
568         if (destroy_dsts
569             && ovsdb_idl_row_is_orphan(arc->dst)
570             && list_is_empty(&arc->dst->dst_arcs)) {
571             ovsdb_idl_row_destroy(arc->dst);
572         }
573         free(arc);
574     }
575     list_init(&row->src_arcs);
576 }
577
578 /* Force nodes that reference 'row' to reparse. */
579 static void
580 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row, bool destroy_dsts)
581 {
582     struct ovsdb_idl_arc *arc, *next;
583
584     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
585      * 'arc', so we need to use the "safe" variant of list traversal.  However,
586      * calling ref->table->class->parse will add an arc equivalent to 'arc' to
587      * row->arcs.  That could be a problem for traversal, but it adds it at the
588      * beginning of the list to prevent us from stumbling upon it again.
589      *
590      * (If duplicate arcs were possible then we would need to make sure that
591      * 'next' didn't also point into 'arc''s destination, but we forbid
592      * duplicate arcs.) */
593     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
594                         &row->dst_arcs) {
595         struct ovsdb_idl_row *ref = arc->src;
596
597         (ref->table->class->unparse)(ref);
598         ovsdb_idl_row_clear_arcs(ref, destroy_dsts);
599         (ref->table->class->parse)(ref);
600     }
601 }
602
603 static struct ovsdb_idl_row *
604 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
605 {
606     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
607     memset(row, 0, sizeof *row);
608     list_init(&row->src_arcs);
609     list_init(&row->dst_arcs);
610     hmap_node_nullify(&row->txn_node);
611     return row;
612 }
613
614 static struct ovsdb_idl_row *
615 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
616 {
617     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
618     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
619     row->uuid = *uuid;
620     row->table = table;
621     return row;
622 }
623
624 static void
625 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
626 {
627     if (row) {
628         ovsdb_idl_row_clear_old(row);
629         hmap_remove(&row->table->rows, &row->hmap_node);
630         free(row);
631     }
632 }
633
634 static void
635 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
636 {
637     const struct ovsdb_idl_table_class *class = row->table->class;
638     size_t i;
639
640     assert(!row->old && !row->new);
641     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
642     for (i = 0; i < class->n_columns; i++) {
643         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
644     }
645     ovsdb_idl_row_update(row, row_json);
646     (class->parse)(row);
647
648     ovsdb_idl_row_reparse_backrefs(row, false);
649 }
650
651 static void
652 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
653 {
654     (row->table->class->unparse)(row);
655     ovsdb_idl_row_clear_arcs(row, true);
656     ovsdb_idl_row_clear_old(row);
657     if (list_is_empty(&row->dst_arcs)) {
658         ovsdb_idl_row_destroy(row);
659     } else {
660         ovsdb_idl_row_reparse_backrefs(row, true);
661     }
662 }
663
664 static void
665 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
666 {
667     (row->table->class->unparse)(row);
668     ovsdb_idl_row_clear_arcs(row, true);
669     ovsdb_idl_row_update(row, row_json);
670     (row->table->class->parse)(row);
671 }
672
673 static bool
674 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
675 {
676     const struct ovsdb_idl_arc *arc;
677
678     /* No self-arcs. */
679     if (src == dst) {
680         return false;
681     }
682
683     /* No duplicate arcs.
684      *
685      * We only need to test whether the first arc in dst->dst_arcs originates
686      * at 'src', since we add all of the arcs from a given source in a clump
687      * (in a single call to a row's ->parse function) and new arcs are always
688      * added at the front of the dst_arcs list. */
689     if (list_is_empty(&dst->dst_arcs)) {
690         return true;
691     }
692     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
693     return arc->src != src;
694 }
695
696 static struct ovsdb_idl_table *
697 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
698                            const struct ovsdb_idl_table_class *table_class)
699 {
700     return &idl->tables[table_class - idl->class->tables];
701 }
702
703 struct ovsdb_idl_row *
704 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
705                       struct ovsdb_idl_table_class *dst_table_class,
706                       const struct uuid *dst_uuid)
707 {
708     struct ovsdb_idl *idl = src->table->idl;
709     struct ovsdb_idl_table *dst_table;
710     struct ovsdb_idl_arc *arc;
711     struct ovsdb_idl_row *dst;
712
713     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
714     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
715     if (!dst) {
716         dst = ovsdb_idl_row_create(dst_table, dst_uuid);
717     }
718
719     /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
720     if (may_add_arc(src, dst)) {
721         /* The arc *must* be added at the front of the dst_arcs list.  See
722          * ovsdb_idl_row_reparse_backrefs() for details. */
723         arc = xmalloc(sizeof *arc);
724         list_push_front(&src->src_arcs, &arc->src_node);
725         list_push_front(&dst->dst_arcs, &arc->dst_node);
726         arc->src = src;
727         arc->dst = dst;
728     }
729
730     return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
731 }
732
733 static struct ovsdb_idl_row *
734 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
735 {
736     for (; node; node = hmap_next(&table->rows, node)) {
737         struct ovsdb_idl_row *row;
738
739         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
740         if (!ovsdb_idl_row_is_orphan(row)) {
741             return row;
742         }
743     }
744     return NULL;
745 }
746
747 struct ovsdb_idl_row *
748 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
749                     const struct ovsdb_idl_table_class *table_class)
750 {
751     struct ovsdb_idl_table *table
752         = ovsdb_idl_table_from_class(idl, table_class);
753     return next_real_row(table, hmap_first(&table->rows));
754 }
755
756 struct ovsdb_idl_row *
757 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
758 {
759     struct ovsdb_idl_table *table = row->table;
760
761     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
762 }
763 \f
764 /* Transactions. */
765
766 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
767                                    enum ovsdb_idl_txn_status);
768
769 const char *
770 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
771 {
772     switch (status) {
773     case TXN_INCOMPLETE:
774         return "incomplete";
775     case TXN_ABORTED:
776         return "aborted";
777     case TXN_SUCCESS:
778         return "success";
779     case TXN_TRY_AGAIN:
780         return "try again";
781     case TXN_ERROR:
782         return "error";
783     }
784     return "<unknown>";
785 }
786
787 struct ovsdb_idl_txn *
788 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
789 {
790     struct ovsdb_idl_txn *txn;
791
792     assert(!idl->txn);
793     idl->txn = txn = xmalloc(sizeof *txn);
794     txn->idl = idl;
795     txn->status = TXN_INCOMPLETE;
796     hmap_init(&txn->txn_rows);
797     txn->dry_run = false;
798     ds_init(&txn->comment);
799     return txn;
800 }
801
802 void
803 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s)
804 {
805     if (txn->comment.length) {
806         ds_put_char(&txn->comment, '\n');
807     }
808     ds_put_cstr(&txn->comment, s);
809 }
810
811 void
812 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
813 {
814     txn->dry_run = true;
815 }
816
817 void
818 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
819 {
820     if (txn->status == TXN_INCOMPLETE) {
821         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
822     }
823     ovsdb_idl_txn_abort(txn);
824     ds_destroy(&txn->comment);
825     free(txn);
826 }
827
828 void
829 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
830 {
831     if (txn->status != TXN_INCOMPLETE) {
832         poll_immediate_wake();
833     }
834 }
835
836 static struct json *
837 where_uuid_equals(const struct uuid *uuid)
838 {
839     return
840         json_array_create_1(
841             json_array_create_3(
842                 json_string_create("_uuid"),
843                 json_string_create("=="),
844                 json_array_create_2(
845                     json_string_create("uuid"),
846                     json_string_create_nocopy(
847                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
848 }
849
850 static char *
851 uuid_name_from_uuid(const struct uuid *uuid)
852 {
853     char *name;
854     char *p;
855
856     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
857     for (p = name; *p != '\0'; p++) {
858         if (*p == '-') {
859             *p = '_';
860         }
861     }
862
863     return name;
864 }
865
866 static const struct ovsdb_idl_row *
867 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
868 {
869     const struct ovsdb_idl_row *row;
870
871     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, txn_node,
872                              uuid_hash(uuid), &txn->txn_rows) {
873         if (uuid_equals(&row->uuid, uuid)) {
874             return row;
875         }
876     }
877     return NULL;
878 }
879
880 /* XXX there must be a cleaner way to do this */
881 static struct json *
882 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
883 {
884     if (json->type == JSON_ARRAY) {
885         struct uuid uuid;
886         size_t i;
887
888         if (json->u.array.n == 2
889             && json->u.array.elems[0]->type == JSON_STRING
890             && json->u.array.elems[1]->type == JSON_STRING
891             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
892             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
893             const struct ovsdb_idl_row *row;
894
895             row = ovsdb_idl_txn_get_row(txn, &uuid);
896             if (row && !row->old && row->new) {
897                 json_destroy(json);
898
899                 return json_array_create_2(
900                     json_string_create("named-uuid"),
901                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
902             }
903         }
904
905         for (i = 0; i < json->u.array.n; i++) {
906             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
907                                                       txn);
908         }
909     } else if (json->type == JSON_OBJECT) {
910         struct shash_node *node;
911
912         SHASH_FOR_EACH (node, json_object(json)) {
913             node->data = substitute_uuids(node->data, txn);
914         }
915     }
916     return json;
917 }
918
919 static void
920 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
921 {
922     struct ovsdb_idl_row *row, *next;
923
924     HMAP_FOR_EACH_SAFE (row, next, struct ovsdb_idl_row, txn_node,
925                         &txn->txn_rows) {
926         if (row->old && row->written) {
927             (row->table->class->unparse)(row);
928             ovsdb_idl_row_clear_arcs(row, false);
929             (row->table->class->parse)(row);
930         }
931         ovsdb_idl_row_clear_new(row);
932
933         free(row->prereqs);
934         row->prereqs = NULL;
935
936         free(row->written);
937         row->written = NULL;
938
939         hmap_remove(&txn->txn_rows, &row->txn_node);
940         hmap_node_nullify(&row->txn_node);
941     }
942     hmap_destroy(&txn->txn_rows);
943     hmap_init(&txn->txn_rows);
944 }
945
946 enum ovsdb_idl_txn_status
947 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
948 {
949     struct ovsdb_idl_row *row;
950     struct json *operations;
951     bool any_updates;
952
953     if (txn != txn->idl->txn) {
954         return txn->status;
955     }
956
957     operations = json_array_create_empty();
958
959     /* Add prerequisites and declarations of new rows. */
960     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
961         /* XXX check that deleted rows exist even if no prereqs? */
962         if (row->prereqs) {
963             const struct ovsdb_idl_table_class *class = row->table->class;
964             size_t n_columns = class->n_columns;
965             struct json *op, *columns, *row_json;
966             size_t idx;
967
968             op = json_object_create();
969             json_array_add(operations, op);
970             json_object_put_string(op, "op", "wait");
971             json_object_put_string(op, "table", class->name);
972             json_object_put(op, "timeout", json_integer_create(0));
973             json_object_put(op, "where", where_uuid_equals(&row->uuid));
974             json_object_put_string(op, "until", "==");
975             columns = json_array_create_empty();
976             json_object_put(op, "columns", columns);
977             row_json = json_object_create();
978             json_object_put(op, "rows", json_array_create_1(row_json));
979
980             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
981                 const struct ovsdb_idl_column *column = &class->columns[idx];
982                 json_array_add(columns, json_string_create(column->name));
983                 json_object_put(row_json, column->name,
984                                 ovsdb_datum_to_json(&row->old[idx],
985                                                     &column->type));
986             }
987         }
988         if (row->new && !row->old) {
989             struct json *op;
990
991             op = json_object_create();
992             json_array_add(operations, op);
993             json_object_put_string(op, "op", "declare");
994             json_object_put(op, "uuid-name",
995                             json_string_create_nocopy(
996                                 uuid_name_from_uuid(&row->uuid)));
997         }
998     }
999
1000     /* Add updates. */
1001     any_updates = false;
1002     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
1003         const struct ovsdb_idl_table_class *class = row->table->class;
1004
1005         if (row->old == row->new) {
1006             continue;
1007         } else if (!row->new) {
1008             struct json *op = json_object_create();
1009             json_object_put_string(op, "op", "delete");
1010             json_object_put_string(op, "table", class->name);
1011             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1012             json_array_add(operations, op);
1013             any_updates = true;
1014         } else {
1015             struct json *row_json;
1016             struct json *op;
1017             size_t idx;
1018
1019             op = json_object_create();
1020             json_object_put_string(op, "op", row->old ? "update" : "insert");
1021             json_object_put_string(op, "table", class->name);
1022             if (row->old) {
1023                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1024             } else {
1025                 json_object_put(op, "uuid-name",
1026                                 json_string_create_nocopy(
1027                                     uuid_name_from_uuid(&row->uuid)));
1028             }
1029             row_json = json_object_create();
1030             json_object_put(op, "row", row_json);
1031
1032             BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
1033                 const struct ovsdb_idl_column *column = &class->columns[idx];
1034
1035                 if (!row->old || !ovsdb_datum_equals(&row->old[idx],
1036                                                      &row->new[idx],
1037                                                      &column->type)) {
1038                     json_object_put(row_json, column->name,
1039                                     substitute_uuids(
1040                                         ovsdb_datum_to_json(&row->new[idx],
1041                                                             &column->type),
1042                                         txn));
1043                 }
1044             }
1045
1046             if (!row->old || !shash_is_empty(json_object(row_json))) {
1047                 json_array_add(operations, op);
1048                 any_updates = true;
1049             } else {
1050                 json_destroy(op);
1051             }
1052         }
1053     }
1054
1055     if (txn->comment.length) {
1056         struct json *op = json_object_create();
1057         json_object_put_string(op, "op", "comment");
1058         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
1059         json_array_add(operations, op);
1060     }
1061
1062     if (txn->dry_run) {
1063         struct json *op = json_object_create();
1064         json_object_put_string(op, "op", "abort");
1065         json_array_add(operations, op);
1066     }
1067
1068     if (!any_updates) {
1069         txn->status = TXN_SUCCESS;
1070     } else if (!jsonrpc_session_send(
1071                    txn->idl->session,
1072                    jsonrpc_create_request(
1073                        "transact", operations, &txn->request_id))) {
1074         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1075                     json_hash(txn->request_id, 0));
1076     } else {
1077         txn->status = TXN_INCOMPLETE;
1078     }
1079
1080     txn->idl->txn = NULL;
1081     ovsdb_idl_txn_disassemble(txn);
1082     return txn->status;
1083 }
1084
1085 void
1086 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1087 {
1088     ovsdb_idl_txn_disassemble(txn);
1089     if (txn->status == TXN_INCOMPLETE) {
1090         txn->status = TXN_ABORTED;
1091     }
1092 }
1093
1094 static void
1095 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1096                        enum ovsdb_idl_txn_status status)
1097 {
1098     txn->status = status;
1099     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1100 }
1101
1102 void
1103 ovsdb_idl_txn_write(struct ovsdb_idl_row *row,
1104                     const struct ovsdb_idl_column *column,
1105                     struct ovsdb_datum *datum)
1106 {
1107     const struct ovsdb_idl_table_class *class = row->table->class;
1108     size_t column_idx = column - class->columns;
1109
1110     assert(row->new);
1111     if (hmap_node_is_null(&row->txn_node)) {
1112         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1113                     uuid_hash(&row->uuid));
1114     }
1115     if (row->old == row->new) {
1116         row->new = xmalloc(class->n_columns * sizeof *row->new);
1117     }
1118     if (!row->written) {
1119         row->written = bitmap_allocate(class->n_columns);
1120     }
1121     if (bitmap_is_set(row->written, column_idx)) {
1122         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1123     } else {
1124         bitmap_set1(row->written, column_idx);
1125     }
1126     row->new[column_idx] = *datum;
1127 }
1128
1129 void
1130 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1131                      const struct ovsdb_idl_column *column)
1132 {
1133     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1134     const struct ovsdb_idl_table_class *class = row->table->class;
1135     size_t column_idx = column - class->columns;
1136
1137     assert(row->new);
1138     if (!row->old
1139         || (row->written && bitmap_is_set(row->written, column_idx))) {
1140         return;
1141     }
1142
1143     if (hmap_node_is_null(&row->txn_node)) {
1144         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1145                     uuid_hash(&row->uuid));
1146     }
1147     if (!row->prereqs) {
1148         row->prereqs = bitmap_allocate(class->n_columns);
1149     }
1150     bitmap_set1(row->prereqs, column_idx);
1151 }
1152
1153 void
1154 ovsdb_idl_txn_delete(struct ovsdb_idl_row *row)
1155 {
1156     assert(row->new);
1157     if (!row->old) {
1158         ovsdb_idl_row_clear_new(row);
1159         assert(!row->prereqs);
1160         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
1161         free(row);
1162     }
1163     if (hmap_node_is_null(&row->txn_node)) {
1164         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1165                     uuid_hash(&row->uuid));
1166     }
1167     ovsdb_idl_row_clear_new(row);
1168     row->new = NULL;
1169 }
1170
1171 struct ovsdb_idl_row *
1172 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
1173                      const struct ovsdb_idl_table_class *class)
1174 {
1175     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
1176     uuid_generate(&row->uuid);
1177     row->table = ovsdb_idl_table_from_class(txn->idl, class);
1178     row->new = xmalloc(class->n_columns * sizeof *row->new);
1179     row->written = bitmap_allocate(class->n_columns);
1180     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
1181     return row;
1182 }
1183
1184 static void
1185 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
1186 {
1187     struct ovsdb_idl_txn *txn;
1188
1189     HMAP_FOR_EACH (txn, struct ovsdb_idl_txn, hmap_node,
1190                    &idl->outstanding_txns) {
1191         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
1192     }
1193 }
1194
1195 static struct ovsdb_idl_txn *
1196 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
1197 {
1198     struct ovsdb_idl_txn *txn;
1199
1200     HMAP_FOR_EACH_WITH_HASH (txn, struct ovsdb_idl_txn, hmap_node,
1201                              json_hash(id, 0), &idl->outstanding_txns) {
1202         if (json_equal(id, txn->request_id)) {
1203             return txn;
1204         }
1205     }
1206     return NULL;
1207 }
1208
1209 static bool
1210 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
1211                             const struct jsonrpc_msg *msg)
1212 {
1213     struct ovsdb_idl_txn *txn;
1214     enum ovsdb_idl_txn_status status;
1215
1216     txn = ovsdb_idl_txn_find(idl, msg->id);
1217     if (!txn) {
1218         return false;
1219     }
1220
1221     if (msg->type == JSONRPC_ERROR) {
1222         status = TXN_ERROR;
1223     } else if (msg->result->type != JSON_ARRAY) {
1224         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
1225         status = TXN_ERROR;
1226     } else {
1227         int hard_errors = 0;
1228         int soft_errors = 0;
1229         size_t i;
1230
1231         for (i = 0; i < msg->result->u.array.n; i++) {
1232             struct json *json = msg->result->u.array.elems[i];
1233
1234             if (json->type == JSON_NULL) {
1235                 /* This isn't an error in itself but indicates that some prior
1236                  * operation failed, so make sure that we know about it. */
1237                 soft_errors++;
1238             } else if (json->type == JSON_OBJECT) {
1239                 struct json *error;
1240
1241                 error = shash_find_data(json_object(json), "error");
1242                 if (error) {
1243                     if (error->type == JSON_STRING) {
1244                         if (!strcmp(error->u.string, "timed out")) {
1245                             soft_errors++;
1246                         } else if (strcmp(error->u.string, "aborted")) {
1247                             hard_errors++;
1248                         }
1249                     } else {
1250                         hard_errors++;
1251                         VLOG_WARN_RL(&syntax_rl,
1252                                      "\"error\" in reply is not JSON string");
1253                     }
1254                 }
1255             } else {
1256                 hard_errors++;
1257                 VLOG_WARN_RL(&syntax_rl,
1258                              "operation reply is not JSON null or object");
1259             }
1260         }
1261
1262         status = (hard_errors ? TXN_ERROR
1263                   : soft_errors ? TXN_TRY_AGAIN
1264                   : TXN_SUCCESS);
1265     }
1266
1267     ovsdb_idl_txn_complete(txn, status);
1268     return true;
1269 }
1270
1271 struct ovsdb_idl_txn *
1272 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
1273 {
1274     struct ovsdb_idl_txn *txn = row->table->idl->txn;
1275     assert(txn != NULL);
1276     return txn;
1277 }