ovsdb-idl: New function to obtain the current transaction from any row.
[sliver-openvswitch.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-data.h"
29 #include "ovsdb-error.h"
30 #include "ovsdb-idl-provider.h"
31 #include "shash.h"
32 #include "util.h"
33
34 #define THIS_MODULE VLM_ovsdb_idl
35 #include "vlog.h"
36
37 /* An arc from one idl_row to another.  When row A contains a UUID that
38  * references row B, this is represented by an arc from A (the source) to B
39  * (the destination).
40  *
41  * Arcs from a row to itself are omitted, that is, src and dst are always
42  * different.
43  *
44  * Arcs are never duplicated, that is, even if there are multiple references
45  * from A to B, there is only a single arc from A to B.
46  *
47  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
48  * A.  Both an arc and its converse may both be present, if each row refers
49  * to the other circularly.
50  *
51  * The source and destination row may be in the same table or in different
52  * tables.
53  */
54 struct ovsdb_idl_arc {
55     struct list src_node;       /* In src->src_arcs list. */
56     struct list dst_node;       /* In dst->dst_arcs list. */
57     struct ovsdb_idl_row *src;  /* Source row. */
58     struct ovsdb_idl_row *dst;  /* Destination row. */
59 };
60
61 struct ovsdb_idl {
62     const struct ovsdb_idl_class *class;
63     struct jsonrpc_session *session;
64     struct shash table_by_name;
65     struct ovsdb_idl_table *tables;
66     struct json *monitor_request_id;
67     unsigned int last_monitor_request_seqno;
68     unsigned int change_seqno;
69
70     /* Transaction support. */
71     struct ovsdb_idl_txn *txn;
72     struct hmap outstanding_txns;
73 };
74
75 struct ovsdb_idl_txn {
76     struct hmap_node hmap_node;
77     struct json *request_id;
78     struct ovsdb_idl *idl;
79     struct hmap txn_rows;
80     enum ovsdb_idl_txn_status status;
81 };
82
83 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
84 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
85
86 static void ovsdb_idl_clear(struct ovsdb_idl *);
87 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
88 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
89 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
90                                                     const struct json *);
91 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
92                                      const struct uuid *,
93                                      const struct json *old,
94                                      const struct json *new);
95 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
96 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
97 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
98
99 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
100 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
101     const struct ovsdb_idl_table_class *);
102 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
103                                                   const struct uuid *);
104 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
105
106 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
107 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
108
109 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
110 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
111                                         const struct jsonrpc_msg *msg);
112
113 struct ovsdb_idl *
114 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
115 {
116     struct ovsdb_idl *idl;
117     size_t i;
118
119     idl = xzalloc(sizeof *idl);
120     idl->class = class;
121     idl->session = jsonrpc_session_open(remote);
122     shash_init(&idl->table_by_name);
123     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
124     for (i = 0; i < class->n_tables; i++) {
125         const struct ovsdb_idl_table_class *tc = &class->tables[i];
126         struct ovsdb_idl_table *table = &idl->tables[i];
127         size_t j;
128
129         assert(!shash_find(&idl->table_by_name, tc->name));
130         shash_add(&idl->table_by_name, tc->name, table);
131         table->class = tc;
132         shash_init(&table->columns);
133         for (j = 0; j < tc->n_columns; j++) {
134             const struct ovsdb_idl_column *column = &tc->columns[j];
135
136             assert(!shash_find(&table->columns, column->name));
137             shash_add(&table->columns, column->name, column);
138         }
139         hmap_init(&table->rows);
140         table->idl = idl;
141     }
142     idl->last_monitor_request_seqno = UINT_MAX;
143     hmap_init(&idl->outstanding_txns);
144
145     return idl;
146 }
147
148 void
149 ovsdb_idl_destroy(struct ovsdb_idl *idl)
150 {
151     if (idl) {
152         size_t i;
153
154         assert(!idl->txn);
155         ovsdb_idl_clear(idl);
156         jsonrpc_session_close(idl->session);
157
158         for (i = 0; i < idl->class->n_tables; i++) {
159             struct ovsdb_idl_table *table = &idl->tables[i];
160             shash_destroy(&table->columns);
161             hmap_destroy(&table->rows);
162         }
163         shash_destroy(&idl->table_by_name);
164         free(idl->tables);
165         json_destroy(idl->monitor_request_id);
166         free(idl);
167     }
168 }
169
170 static void
171 ovsdb_idl_clear(struct ovsdb_idl *idl)
172 {
173     bool changed = false;
174     size_t i;
175
176     for (i = 0; i < idl->class->n_tables; i++) {
177         struct ovsdb_idl_table *table = &idl->tables[i];
178         struct ovsdb_idl_row *row, *next_row;
179
180         if (hmap_is_empty(&table->rows)) {
181             continue;
182         }
183
184         changed = true;
185         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
186                             &table->rows) {
187             struct ovsdb_idl_arc *arc, *next_arc;
188
189             if (!ovsdb_idl_row_is_orphan(row)) {
190                 (row->table->class->unparse)(row);
191                 ovsdb_idl_row_clear_old(row);
192             }
193             hmap_remove(&table->rows, &row->hmap_node);
194             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
195                                 &row->src_arcs) {
196                 free(arc);
197             }
198             /* No need to do anything with dst_arcs: some node has those arcs
199              * as forward arcs and will destroy them itself. */
200
201             free(row);
202         }
203     }
204
205     if (changed) {
206         idl->change_seqno++;
207     }
208 }
209
210 void
211 ovsdb_idl_run(struct ovsdb_idl *idl)
212 {
213     int i;
214
215     assert(!idl->txn);
216     jsonrpc_session_run(idl->session);
217     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
218         struct jsonrpc_msg *msg, *reply;
219         unsigned int seqno;
220
221         seqno = jsonrpc_session_get_seqno(idl->session);
222         if (idl->last_monitor_request_seqno != seqno) {
223             idl->last_monitor_request_seqno = seqno;
224             ovsdb_idl_txn_abort_all(idl);
225             ovsdb_idl_send_monitor_request(idl);
226             break;
227         }
228
229         msg = jsonrpc_session_recv(idl->session);
230         if (!msg) {
231             break;
232         }
233
234         reply = NULL;
235         if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
236             reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
237         } else if (msg->type == JSONRPC_NOTIFY
238                    && !strcmp(msg->method, "update")
239                    && msg->params->type == JSON_ARRAY
240                    && msg->params->u.array.n == 2
241                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
242             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
243         } else if (msg->type == JSONRPC_REPLY
244                    && idl->monitor_request_id
245                    && json_equal(idl->monitor_request_id, msg->id)) {
246             json_destroy(idl->monitor_request_id);
247             idl->monitor_request_id = NULL;
248             ovsdb_idl_clear(idl);
249             ovsdb_idl_parse_update(idl, msg->result);
250         } else if (msg->type == JSONRPC_REPLY
251                    && msg->id && msg->id->type == JSON_STRING
252                    && !strcmp(msg->id->u.string, "echo")) {
253             /* It's a reply to our echo request.  Ignore it. */
254         } else if ((msg->type == JSONRPC_ERROR
255                     || msg->type == JSONRPC_REPLY)
256                    && ovsdb_idl_txn_process_reply(idl, msg)) {
257             /* ovsdb_idl_txn_process_reply() did everything needful. */
258         } else {
259             VLOG_WARN("%s: received unexpected %s message",
260                       jsonrpc_session_get_name(idl->session),
261                       jsonrpc_msg_type_to_string(msg->type));
262             jsonrpc_session_force_reconnect(idl->session);
263         }
264         if (reply) {
265             jsonrpc_session_send(idl->session, reply);
266         }
267         jsonrpc_msg_destroy(msg);
268     }
269 }
270
271 void
272 ovsdb_idl_wait(struct ovsdb_idl *idl)
273 {
274     jsonrpc_session_wait(idl->session);
275     jsonrpc_session_recv_wait(idl->session);
276 }
277
278 unsigned int
279 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
280 {
281     return idl->change_seqno;
282 }
283
284 void
285 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
286 {
287     jsonrpc_session_force_reconnect(idl->session);
288 }
289 \f
290 static void
291 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
292 {
293     struct json *monitor_requests;
294     struct jsonrpc_msg *msg;
295     size_t i;
296
297     monitor_requests = json_object_create();
298     for (i = 0; i < idl->class->n_tables; i++) {
299         const struct ovsdb_idl_table *table = &idl->tables[i];
300         const struct ovsdb_idl_table_class *tc = table->class;
301         struct json *monitor_request, *columns;
302         size_t i;
303
304         monitor_request = json_object_create();
305         columns = json_array_create_empty();
306         for (i = 0; i < tc->n_columns; i++) {
307             const struct ovsdb_idl_column *column = &tc->columns[i];
308             json_array_add(columns, json_string_create(column->name));
309         }
310         json_object_put(monitor_request, "columns", columns);
311         json_object_put(monitor_requests, tc->name, monitor_request);
312     }
313
314     json_destroy(idl->monitor_request_id);
315     msg = jsonrpc_create_request(
316         "monitor", json_array_create_2(json_null_create(), monitor_requests),
317         &idl->monitor_request_id);
318     jsonrpc_session_send(idl->session, msg);
319 }
320
321 static void
322 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
323 {
324     struct ovsdb_error *error;
325
326     idl->change_seqno++;
327
328     error = ovsdb_idl_parse_update__(idl, table_updates);
329     if (error) {
330         if (!VLOG_DROP_WARN(&syntax_rl)) {
331             char *s = ovsdb_error_to_string(error);
332             VLOG_WARN_RL(&syntax_rl, "%s", s);
333             free(s);
334         }
335         ovsdb_error_destroy(error);
336     }
337 }
338
339 static struct ovsdb_error *
340 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
341                          const struct json *table_updates)
342 {
343     const struct shash_node *tables_node;
344
345     if (table_updates->type != JSON_OBJECT) {
346         return ovsdb_syntax_error(table_updates, NULL,
347                                   "<table-updates> is not an object");
348     }
349     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
350         const struct json *table_update = tables_node->data;
351         const struct shash_node *table_node;
352         struct ovsdb_idl_table *table;
353
354         table = shash_find_data(&idl->table_by_name, tables_node->name);
355         if (!table) {
356             return ovsdb_syntax_error(
357                 table_updates, NULL,
358                 "<table-updates> includes unknown table \"%s\"",
359                 tables_node->name);
360         }
361
362         if (table_update->type != JSON_OBJECT) {
363             return ovsdb_syntax_error(table_update, NULL,
364                                       "<table-update> for table \"%s\" is "
365                                       "not an object", table->class->name);
366         }
367         SHASH_FOR_EACH (table_node, json_object(table_update)) {
368             const struct json *row_update = table_node->data;
369             const struct json *old_json, *new_json;
370             struct uuid uuid;
371
372             if (!uuid_from_string(&uuid, table_node->name)) {
373                 return ovsdb_syntax_error(table_update, NULL,
374                                           "<table-update> for table \"%s\" "
375                                           "contains bad UUID "
376                                           "\"%s\" as member name",
377                                           table->class->name,
378                                           table_node->name);
379             }
380             if (row_update->type != JSON_OBJECT) {
381                 return ovsdb_syntax_error(row_update, NULL,
382                                           "<table-update> for table \"%s\" "
383                                           "contains <row-update> for %s that "
384                                           "is not an object",
385                                           table->class->name,
386                                           table_node->name);
387             }
388
389             old_json = shash_find_data(json_object(row_update), "old");
390             new_json = shash_find_data(json_object(row_update), "new");
391             if (old_json && old_json->type != JSON_OBJECT) {
392                 return ovsdb_syntax_error(old_json, NULL,
393                                           "\"old\" <row> is not object");
394             } else if (new_json && new_json->type != JSON_OBJECT) {
395                 return ovsdb_syntax_error(new_json, NULL,
396                                           "\"new\" <row> is not object");
397             } else if ((old_json != NULL) + (new_json != NULL)
398                        != shash_count(json_object(row_update))) {
399                 return ovsdb_syntax_error(row_update, NULL,
400                                           "<row-update> contains unexpected "
401                                           "member");
402             } else if (!old_json && !new_json) {
403                 return ovsdb_syntax_error(row_update, NULL,
404                                           "<row-update> missing \"old\" "
405                                           "and \"new\" members");
406             }
407
408             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
409         }
410     }
411
412     return NULL;
413 }
414
415 static struct ovsdb_idl_row *
416 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
417 {
418     struct ovsdb_idl_row *row;
419
420     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
421                              uuid_hash(uuid), &table->rows) {
422         if (uuid_equals(&row->uuid, uuid)) {
423             return row;
424         }
425     }
426     return NULL;
427 }
428
429 static void
430 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
431                          const struct uuid *uuid, const struct json *old,
432                          const struct json *new)
433 {
434     struct ovsdb_idl_row *row;
435
436     row = ovsdb_idl_get_row(table, uuid);
437     if (!new) {
438         /* Delete row. */
439         if (row && !ovsdb_idl_row_is_orphan(row)) {
440             /* XXX perhaps we should check the 'old' values? */
441             ovsdb_idl_delete_row(row);
442         } else {
443             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
444                          "from table %s",
445                          UUID_ARGS(uuid), table->class->name);
446         }
447     } else if (!old) {
448         /* Insert row. */
449         if (!row) {
450             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
451         } else if (ovsdb_idl_row_is_orphan(row)) {
452             ovsdb_idl_insert_row(row, new);
453         } else {
454             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
455                          "table %s", UUID_ARGS(uuid), table->class->name);
456             ovsdb_idl_modify_row(row, new);
457         }
458     } else {
459         /* Modify row. */
460         if (row) {
461             /* XXX perhaps we should check the 'old' values? */
462             if (!ovsdb_idl_row_is_orphan(row)) {
463                 ovsdb_idl_modify_row(row, new);
464             } else {
465                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
466                              "referenced row "UUID_FMT" in table %s",
467                              UUID_ARGS(uuid), table->class->name);
468                 ovsdb_idl_insert_row(row, new);
469             }
470         } else {
471             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
472                          "in table %s", UUID_ARGS(uuid), table->class->name);
473             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
474         }
475     }
476 }
477
478 static void
479 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
480 {
481     struct ovsdb_idl_table *table = row->table;
482     struct shash_node *node;
483
484     SHASH_FOR_EACH (node, json_object(row_json)) {
485         const char *column_name = node->name;
486         const struct ovsdb_idl_column *column;
487         struct ovsdb_datum datum;
488         struct ovsdb_error *error;
489
490         column = shash_find_data(&table->columns, column_name);
491         if (!column) {
492             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
493                          column_name, UUID_ARGS(&row->uuid));
494             continue;
495         }
496
497         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
498         if (!error) {
499             ovsdb_datum_swap(&row->old[column - table->class->columns],
500                              &datum);
501             ovsdb_datum_destroy(&datum, &column->type);
502         } else {
503             char *s = ovsdb_error_to_string(error);
504             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
505                          " in table %s: %s", column_name,
506                          UUID_ARGS(&row->uuid), table->class->name, s);
507             free(s);
508             ovsdb_error_destroy(error);
509         }
510     }
511 }
512
513 static bool
514 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
515 {
516     return !row->old;
517 }
518
519 static void
520 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
521 {
522     assert(row->old == row->new);
523     if (!ovsdb_idl_row_is_orphan(row)) {
524         const struct ovsdb_idl_table_class *class = row->table->class;
525         size_t i;
526
527         for (i = 0; i < class->n_columns; i++) {
528             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
529         }
530         free(row->old);
531         row->old = row->new = NULL;
532     }
533 }
534
535 static void
536 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
537 {
538     if (row->old != row->new) {
539         if (row->new) {
540             const struct ovsdb_idl_table_class *class = row->table->class;
541             size_t i;
542
543             BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
544                 ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
545             }
546             free(row->new);
547             free(row->written);
548             row->written = NULL;
549         }
550         row->new = row->old;
551     }
552 }
553
554 static void
555 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
556 {
557     struct ovsdb_idl_arc *arc, *next;
558
559     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
560      * that this causes to be unreferenced. */
561     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
562                         &row->src_arcs) {
563         list_remove(&arc->dst_node);
564         if (destroy_dsts
565             && ovsdb_idl_row_is_orphan(arc->dst)
566             && list_is_empty(&arc->dst->dst_arcs)) {
567             ovsdb_idl_row_destroy(arc->dst);
568         }
569         free(arc);
570     }
571     list_init(&row->src_arcs);
572 }
573
574 /* Force nodes that reference 'row' to reparse. */
575 static void
576 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row, bool destroy_dsts)
577 {
578     struct ovsdb_idl_arc *arc, *next;
579
580     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
581      * 'arc', so we need to use the "safe" variant of list traversal.  However,
582      * calling ref->table->class->parse will add an arc equivalent to 'arc' to
583      * row->arcs.  That could be a problem for traversal, but it adds it at the
584      * beginning of the list to prevent us from stumbling upon it again.
585      *
586      * (If duplicate arcs were possible then we would need to make sure that
587      * 'next' didn't also point into 'arc''s destination, but we forbid
588      * duplicate arcs.) */
589     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
590                         &row->dst_arcs) {
591         struct ovsdb_idl_row *ref = arc->src;
592
593         (ref->table->class->unparse)(ref);
594         ovsdb_idl_row_clear_arcs(ref, destroy_dsts);
595         (ref->table->class->parse)(ref);
596     }
597 }
598
599 static struct ovsdb_idl_row *
600 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
601 {
602     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
603     memset(row, 0, sizeof *row);
604     list_init(&row->src_arcs);
605     list_init(&row->dst_arcs);
606     hmap_node_nullify(&row->txn_node);
607     return row;
608 }
609
610 static struct ovsdb_idl_row *
611 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
612 {
613     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
614     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
615     row->uuid = *uuid;
616     row->table = table;
617     return row;
618 }
619
620 static void
621 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
622 {
623     if (row) {
624         ovsdb_idl_row_clear_old(row);
625         hmap_remove(&row->table->rows, &row->hmap_node);
626         free(row);
627     }
628 }
629
630 static void
631 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
632 {
633     const struct ovsdb_idl_table_class *class = row->table->class;
634     size_t i;
635
636     assert(!row->old && !row->new);
637     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
638     for (i = 0; i < class->n_columns; i++) {
639         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
640     }
641     ovsdb_idl_row_update(row, row_json);
642     (class->parse)(row);
643
644     ovsdb_idl_row_reparse_backrefs(row, false);
645 }
646
647 static void
648 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
649 {
650     (row->table->class->unparse)(row);
651     ovsdb_idl_row_clear_arcs(row, true);
652     ovsdb_idl_row_clear_old(row);
653     if (list_is_empty(&row->dst_arcs)) {
654         ovsdb_idl_row_destroy(row);
655     } else {
656         ovsdb_idl_row_reparse_backrefs(row, true);
657     }
658 }
659
660 static void
661 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
662 {
663     (row->table->class->unparse)(row);
664     ovsdb_idl_row_clear_arcs(row, true);
665     ovsdb_idl_row_update(row, row_json);
666     (row->table->class->parse)(row);
667 }
668
669 static bool
670 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
671 {
672     const struct ovsdb_idl_arc *arc;
673
674     /* No self-arcs. */
675     if (src == dst) {
676         return false;
677     }
678
679     /* No duplicate arcs.
680      *
681      * We only need to test whether the first arc in dst->dst_arcs originates
682      * at 'src', since we add all of the arcs from a given source in a clump
683      * (in a single call to a row's ->parse function) and new arcs are always
684      * added at the front of the dst_arcs list. */
685     if (list_is_empty(&dst->dst_arcs)) {
686         return true;
687     }
688     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
689     return arc->src != src;
690 }
691
692 static struct ovsdb_idl_table *
693 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
694                            const struct ovsdb_idl_table_class *table_class)
695 {
696     return &idl->tables[table_class - idl->class->tables];
697 }
698
699 struct ovsdb_idl_row *
700 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
701                       struct ovsdb_idl_table_class *dst_table_class,
702                       const struct uuid *dst_uuid)
703 {
704     struct ovsdb_idl *idl = src->table->idl;
705     struct ovsdb_idl_table *dst_table;
706     struct ovsdb_idl_arc *arc;
707     struct ovsdb_idl_row *dst;
708
709     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
710     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
711     if (!dst) {
712         dst = ovsdb_idl_row_create(dst_table, dst_uuid);
713     }
714
715     /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
716     if (may_add_arc(src, dst)) {
717         /* The arc *must* be added at the front of the dst_arcs list.  See
718          * ovsdb_idl_row_reparse_backrefs() for details. */
719         arc = xmalloc(sizeof *arc);
720         list_push_front(&src->src_arcs, &arc->src_node);
721         list_push_front(&dst->dst_arcs, &arc->dst_node);
722         arc->src = src;
723         arc->dst = dst;
724     }
725
726     return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
727 }
728
729 static struct ovsdb_idl_row *
730 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
731 {
732     for (; node; node = hmap_next(&table->rows, node)) {
733         struct ovsdb_idl_row *row;
734
735         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
736         if (!ovsdb_idl_row_is_orphan(row)) {
737             return row;
738         }
739     }
740     return NULL;
741 }
742
743 struct ovsdb_idl_row *
744 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
745                     const struct ovsdb_idl_table_class *table_class)
746 {
747     struct ovsdb_idl_table *table
748         = ovsdb_idl_table_from_class(idl, table_class);
749     return next_real_row(table, hmap_first(&table->rows));
750 }
751
752 struct ovsdb_idl_row *
753 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
754 {
755     struct ovsdb_idl_table *table = row->table;
756
757     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
758 }
759 \f
760 /* Transactions. */
761
762 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
763                                    enum ovsdb_idl_txn_status);
764
765 const char *
766 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
767 {
768     switch (status) {
769     case TXN_INCOMPLETE:
770         return "incomplete";
771     case TXN_ABORTED:
772         return "aborted";
773     case TXN_SUCCESS:
774         return "success";
775     case TXN_TRY_AGAIN:
776         return "try again";
777     case TXN_ERROR:
778         return "error";
779     }
780     return "<unknown>";
781 }
782
783 struct ovsdb_idl_txn *
784 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
785 {
786     struct ovsdb_idl_txn *txn;
787
788     assert(!idl->txn);
789     idl->txn = txn = xmalloc(sizeof *txn);
790     txn->idl = idl;
791     txn->status = TXN_INCOMPLETE;
792     hmap_init(&txn->txn_rows);
793     return txn;
794 }
795
796 void
797 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
798 {
799     ovsdb_idl_txn_abort(txn);
800     free(txn);
801 }
802
803 static struct json *
804 where_uuid_equals(const struct uuid *uuid)
805 {
806     return
807         json_array_create_1(
808             json_array_create_3(
809                 json_string_create("_uuid"),
810                 json_string_create("=="),
811                 json_array_create_2(
812                     json_string_create("uuid"),
813                     json_string_create_nocopy(
814                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
815 }
816
817 static char *
818 uuid_name_from_uuid(const struct uuid *uuid)
819 {
820     char *name;
821     char *p;
822
823     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
824     for (p = name; *p != '\0'; p++) {
825         if (*p == '-') {
826             *p = '_';
827         }
828     }
829
830     return name;
831 }
832
833 static const struct ovsdb_idl_row *
834 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
835 {
836     const struct ovsdb_idl_row *row;
837
838     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, txn_node,
839                              uuid_hash(uuid), &txn->txn_rows) {
840         if (uuid_equals(&row->uuid, uuid)) {
841             return row;
842         }
843     }
844     return NULL;
845 }
846
847 /* XXX there must be a cleaner way to do this */
848 static struct json *
849 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
850 {
851     if (json->type == JSON_ARRAY) {
852         struct uuid uuid;
853         size_t i;
854
855         if (json->u.array.n == 2
856             && json->u.array.elems[0]->type == JSON_STRING
857             && json->u.array.elems[1]->type == JSON_STRING
858             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
859             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
860             const struct ovsdb_idl_row *row;
861
862             row = ovsdb_idl_txn_get_row(txn, &uuid);
863             if (row && !row->old && row->new) {
864                 json_destroy(json);
865
866                 return json_array_create_2(
867                     json_string_create("named-uuid"),
868                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
869             }
870         }
871
872         for (i = 0; i < json->u.array.n; i++) {
873             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
874                                                       txn);
875         }
876     } else if (json->type == JSON_OBJECT) {
877         struct shash_node *node;
878
879         SHASH_FOR_EACH (node, json_object(json)) {
880             node->data = substitute_uuids(node->data, txn);
881         }
882     }
883     return json;
884 }
885
886 static void
887 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
888 {
889     struct ovsdb_idl_row *row, *next;
890
891     HMAP_FOR_EACH_SAFE (row, next, struct ovsdb_idl_row, txn_node,
892                         &txn->txn_rows) {
893         if (row->old && row->written) {
894             (row->table->class->unparse)(row);
895             ovsdb_idl_row_clear_arcs(row, false);
896             (row->table->class->parse)(row);
897         }
898         ovsdb_idl_row_clear_new(row);
899
900         free(row->prereqs);
901         row->prereqs = NULL;
902
903         free(row->written);
904         row->written = NULL;
905
906         hmap_remove(&txn->txn_rows, &row->txn_node);
907         hmap_node_nullify(&row->txn_node);
908     }
909     hmap_destroy(&txn->txn_rows);
910     hmap_init(&txn->txn_rows);
911 }
912
913 enum ovsdb_idl_txn_status
914 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
915 {
916     struct ovsdb_idl_row *row;
917     struct json *operations;
918     bool any_updates;
919
920     if (txn != txn->idl->txn) {
921         return txn->status;
922     }
923
924     operations = json_array_create_empty();
925
926     /* Add prerequisites and declarations of new rows. */
927     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
928         /* XXX check that deleted rows exist even if no prereqs? */
929         if (row->prereqs) {
930             const struct ovsdb_idl_table_class *class = row->table->class;
931             size_t n_columns = class->n_columns;
932             struct json *op, *columns, *row_json;
933             size_t idx;
934
935             op = json_object_create();
936             json_array_add(operations, op);
937             json_object_put_string(op, "op", "wait");
938             json_object_put_string(op, "table", class->name);
939             json_object_put(op, "timeout", json_integer_create(0));
940             json_object_put(op, "where", where_uuid_equals(&row->uuid));
941             json_object_put_string(op, "until", "==");
942             columns = json_array_create_empty();
943             json_object_put(op, "columns", columns);
944             row_json = json_object_create();
945             json_object_put(op, "rows", json_array_create_1(row_json));
946
947             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
948                 const struct ovsdb_idl_column *column = &class->columns[idx];
949                 json_array_add(columns, json_string_create(column->name));
950                 json_object_put(row_json, column->name,
951                                 ovsdb_datum_to_json(&row->old[idx],
952                                                     &column->type));
953             }
954         }
955         if (row->new && !row->old) {
956             struct json *op;
957
958             op = json_object_create();
959             json_array_add(operations, op);
960             json_object_put_string(op, "op", "declare");
961             json_object_put(op, "uuid-name",
962                             json_string_create_nocopy(
963                                 uuid_name_from_uuid(&row->uuid)));
964         }
965     }
966
967     /* Add updates. */
968     any_updates = false;
969     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
970         const struct ovsdb_idl_table_class *class = row->table->class;
971         size_t n_columns = class->n_columns;
972         struct json *row_json;
973         size_t idx;
974
975         if (row->old == row->new) {
976             continue;
977         } else if (!row->new) {
978             struct json *op = json_object_create();
979             json_object_put_string(op, "op", "delete");
980             json_object_put_string(op, "table", class->name);
981             json_object_put(op, "where", where_uuid_equals(&row->uuid));
982             json_array_add(operations, op);
983         } else {
984             row_json = NULL;
985             BITMAP_FOR_EACH_1 (idx, n_columns, row->written) {
986                 const struct ovsdb_idl_column *column = &class->columns[idx];
987
988                 if (row->old
989                     && ovsdb_datum_equals(&row->old[idx], &row->new[idx],
990                                           &column->type)) {
991                     continue;
992                 }
993                 if (!row_json) {
994                     struct json *op = json_object_create();
995                     json_array_add(operations, op);
996                     json_object_put_string(op, "op",
997                                            row->old ? "update" : "insert");
998                     json_object_put_string(op, "table", class->name);
999                     if (row->old) {
1000                         json_object_put(op, "where",
1001                                         where_uuid_equals(&row->uuid));
1002                     } else {
1003                         json_object_put(op, "uuid-name",
1004                                         json_string_create_nocopy(
1005                                             uuid_name_from_uuid(&row->uuid)));
1006                     }
1007                     row_json = json_object_create();
1008                     json_object_put(op, "row", row_json);
1009                     any_updates = true;
1010                 }
1011                 json_object_put(row_json, column->name,
1012                                 substitute_uuids(
1013                                     ovsdb_datum_to_json(&row->new[idx],
1014                                                         &column->type),
1015                                     txn));
1016             }
1017         }
1018     }
1019
1020     if (!any_updates) {
1021         txn->status = TXN_SUCCESS;
1022     } else if (!jsonrpc_session_send(
1023                    txn->idl->session,
1024                    jsonrpc_create_request(
1025                        "transact", operations, &txn->request_id))) {
1026         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1027                     json_hash(txn->request_id, 0));
1028     } else {
1029         txn->status = TXN_INCOMPLETE;
1030     }
1031
1032     txn->idl->txn = NULL;
1033     ovsdb_idl_txn_disassemble(txn);
1034     return txn->status;
1035 }
1036
1037 void
1038 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1039 {
1040     ovsdb_idl_txn_disassemble(txn);
1041     if (txn->status == TXN_INCOMPLETE) {
1042         txn->status = TXN_ABORTED;
1043     }
1044 }
1045
1046 static void
1047 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1048                        enum ovsdb_idl_txn_status status)
1049 {
1050     txn->status = status;
1051     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1052 }
1053
1054 void
1055 ovsdb_idl_txn_write(struct ovsdb_idl_row *row,
1056                     const struct ovsdb_idl_column *column,
1057                     struct ovsdb_datum *datum)
1058 {
1059     const struct ovsdb_idl_table_class *class = row->table->class;
1060     size_t column_idx = column - class->columns;
1061
1062     assert(row->new);
1063     if (hmap_node_is_null(&row->txn_node)) {
1064         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1065                     uuid_hash(&row->uuid));
1066     }
1067     if (row->old == row->new) {
1068         row->new = xmalloc(class->n_columns * sizeof *row->new);
1069     }
1070     if (!row->written) {
1071         row->written = bitmap_allocate(class->n_columns);
1072     }
1073     if (bitmap_is_set(row->written, column_idx)) {
1074         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1075     } else {
1076         bitmap_set1(row->written, column_idx);
1077     }
1078     row->new[column_idx] = *datum;
1079 }
1080
1081 void
1082 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1083                      const struct ovsdb_idl_column *column)
1084 {
1085     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1086     const struct ovsdb_idl_table_class *class = row->table->class;
1087     size_t column_idx = column - class->columns;
1088
1089     assert(row->new);
1090     if (!row->old
1091         || (row->written && bitmap_is_set(row->written, column_idx))) {
1092         return;
1093     }
1094
1095     if (hmap_node_is_null(&row->txn_node)) {
1096         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1097                     uuid_hash(&row->uuid));
1098     }
1099     if (!row->prereqs) {
1100         row->prereqs = bitmap_allocate(class->n_columns);
1101     }
1102     bitmap_set1(row->prereqs, column_idx);
1103 }
1104
1105 void
1106 ovsdb_idl_txn_delete(struct ovsdb_idl_row *row)
1107 {
1108     assert(row->new);
1109     if (!row->old) {
1110         ovsdb_idl_row_clear_new(row);
1111         assert(!row->prereqs);
1112         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
1113         free(row);
1114     }
1115     if (hmap_node_is_null(&row->txn_node)) {
1116         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1117                     uuid_hash(&row->uuid));
1118     }
1119     if (row->new == row->old) {
1120         row->new = NULL;
1121     } else {
1122         ovsdb_idl_row_clear_new(row);
1123     }
1124 }
1125
1126 struct ovsdb_idl_row *
1127 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
1128                      const struct ovsdb_idl_table_class *class)
1129 {
1130     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
1131     uuid_generate(&row->uuid);
1132     row->table = ovsdb_idl_table_from_class(txn->idl, class);
1133     row->new = xmalloc(class->n_columns * sizeof *row->new);
1134     row->written = bitmap_allocate(class->n_columns);
1135     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
1136     return row;
1137 }
1138
1139 static void
1140 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
1141 {
1142     struct ovsdb_idl_txn *txn;
1143
1144     HMAP_FOR_EACH (txn, struct ovsdb_idl_txn, hmap_node,
1145                    &idl->outstanding_txns) {
1146         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
1147     }
1148 }
1149
1150 static struct ovsdb_idl_txn *
1151 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
1152 {
1153     struct ovsdb_idl_txn *txn;
1154
1155     HMAP_FOR_EACH_WITH_HASH (txn, struct ovsdb_idl_txn, hmap_node,
1156                              json_hash(id, 0), &idl->outstanding_txns) {
1157         if (json_equal(id, txn->request_id)) {
1158             return txn;
1159         }
1160     }
1161     return NULL;
1162 }
1163
1164 static bool
1165 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
1166                             const struct jsonrpc_msg *msg)
1167 {
1168     struct ovsdb_idl_txn *txn;
1169     enum ovsdb_idl_txn_status status;
1170
1171     txn = ovsdb_idl_txn_find(idl, msg->id);
1172     if (!txn) {
1173         return false;
1174     }
1175
1176     if (msg->type == JSONRPC_ERROR) {
1177         status = TXN_ERROR;
1178     } else if (msg->result->type != JSON_ARRAY) {
1179         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
1180         status = TXN_ERROR;
1181     } else {
1182         int hard_errors = 0;
1183         int soft_errors = 0;
1184         size_t i;
1185
1186         for (i = 0; i < msg->result->u.array.n; i++) {
1187             struct json *json = msg->result->u.array.elems[i];
1188
1189             if (json->type == JSON_NULL) {
1190                 /* This isn't an error in itself but indicates that some prior
1191                  * operation failed, so make sure that we know about it. */
1192                 soft_errors++;
1193             } else if (json->type == JSON_OBJECT) {
1194                 struct json *error;
1195
1196                 error = shash_find_data(json_object(json), "error");
1197                 if (error) {
1198                     if (error->type == JSON_STRING) {
1199                         if (!strcmp(error->u.string, "timed out")) {
1200                             soft_errors++;
1201                         } else {
1202                             hard_errors++;
1203                         }
1204                     } else {
1205                         hard_errors++;
1206                         VLOG_WARN_RL(&syntax_rl,
1207                                      "\"error\" in reply is not JSON string");
1208                     }
1209                 }
1210             } else {
1211                 hard_errors++;
1212                 VLOG_WARN_RL(&syntax_rl,
1213                              "operation reply is not JSON null or object");
1214             }
1215         }
1216
1217         status = (hard_errors ? TXN_ERROR
1218                   : soft_errors ? TXN_TRY_AGAIN
1219                   : TXN_SUCCESS);
1220     }
1221
1222     ovsdb_idl_txn_complete(txn, status);
1223     return true;
1224 }
1225
1226 struct ovsdb_idl_txn *
1227 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
1228 {
1229     struct ovsdb_idl_txn *txn = row->table->idl->txn;
1230     assert(txn != NULL);
1231     return txn;
1232 }