Setting tag sliver-openvswitch-2.2.90-1
[sliver-openvswitch.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "dynamic-string.h"
27 #include "fatal-signal.h"
28 #include "json.h"
29 #include "jsonrpc.h"
30 #include "ovsdb-data.h"
31 #include "ovsdb-error.h"
32 #include "ovsdb-idl-provider.h"
33 #include "poll-loop.h"
34 #include "shash.h"
35 #include "util.h"
36 #include "vlog.h"
37
38 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
39
40 /* An arc from one idl_row to another.  When row A contains a UUID that
41  * references row B, this is represented by an arc from A (the source) to B
42  * (the destination).
43  *
44  * Arcs from a row to itself are omitted, that is, src and dst are always
45  * different.
46  *
47  * Arcs are never duplicated, that is, even if there are multiple references
48  * from A to B, there is only a single arc from A to B.
49  *
50  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
51  * A.  Both an arc and its converse may both be present, if each row refers
52  * to the other circularly.
53  *
54  * The source and destination row may be in the same table or in different
55  * tables.
56  */
57 struct ovsdb_idl_arc {
58     struct list src_node;       /* In src->src_arcs list. */
59     struct list dst_node;       /* In dst->dst_arcs list. */
60     struct ovsdb_idl_row *src;  /* Source row. */
61     struct ovsdb_idl_row *dst;  /* Destination row. */
62 };
63
64 struct ovsdb_idl {
65     const struct ovsdb_idl_class *class;
66     struct jsonrpc_session *session;
67     struct shash table_by_name;
68     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
69     struct json *monitor_request_id;
70     unsigned int last_monitor_request_seqno;
71     unsigned int change_seqno;
72     bool verify_write_only;
73
74     /* Database locking. */
75     char *lock_name;            /* Name of lock we need, NULL if none. */
76     bool has_lock;              /* Has db server told us we have the lock? */
77     bool is_lock_contended;     /* Has db server told us we can't get lock? */
78     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
79
80     /* Transaction support. */
81     struct ovsdb_idl_txn *txn;
82     struct hmap outstanding_txns;
83 };
84
85 struct ovsdb_idl_txn {
86     struct hmap_node hmap_node;
87     struct json *request_id;
88     struct ovsdb_idl *idl;
89     struct hmap txn_rows;
90     enum ovsdb_idl_txn_status status;
91     char *error;
92     bool dry_run;
93     struct ds comment;
94
95     /* Increments. */
96     const char *inc_table;
97     const char *inc_column;
98     struct uuid inc_row;
99     unsigned int inc_index;
100     int64_t inc_new_value;
101
102     /* Inserted rows. */
103     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
104 };
105
106 struct ovsdb_idl_txn_insert {
107     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
108     struct uuid dummy;          /* Dummy UUID used locally. */
109     int op_index;               /* Index into transaction's operation array. */
110     struct uuid real;           /* Real UUID used by database server. */
111 };
112
113 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
114 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
115
116 static void ovsdb_idl_clear(struct ovsdb_idl *);
117 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
118 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
119 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
120                                                     const struct json *);
121 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
122                                      const struct uuid *,
123                                      const struct json *old,
124                                      const struct json *new);
125 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
126 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
127 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
128
129 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
130 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
131     const struct ovsdb_idl_table_class *);
132 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
133                                                   const struct uuid *);
134 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
135
136 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
137 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
138 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
139 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
140
141 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
142 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
143                                         const struct jsonrpc_msg *msg);
144
145 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
146 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
147 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
148                                        const struct json *);
149 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
150                                         const struct json *params,
151                                         bool new_has_lock);
152
153 /* Creates and returns a connection to database 'remote', which should be in a
154  * form acceptable to jsonrpc_session_open().  The connection will maintain an
155  * in-memory replica of the remote database whose schema is described by
156  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
157  * by ovsdb-idlc.)
158  *
159  * Passes 'retry' to jsonrpc_session_open().  See that function for
160  * documentation.
161  *
162  * If 'monitor_everything_by_default' is true, then everything in the remote
163  * database will be replicated by default.  ovsdb_idl_omit() and
164  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
165  * monitoring.
166  *
167  * If 'monitor_everything_by_default' is false, then no columns or tables will
168  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
169  * must be used to choose some columns or tables to replicate.
170  */
171 struct ovsdb_idl *
172 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
173                  bool monitor_everything_by_default, bool retry)
174 {
175     struct ovsdb_idl *idl;
176     uint8_t default_mode;
177     size_t i;
178
179     default_mode = (monitor_everything_by_default
180                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
181                     : 0);
182
183     idl = xzalloc(sizeof *idl);
184     idl->class = class;
185     idl->session = jsonrpc_session_open(remote, retry);
186     shash_init(&idl->table_by_name);
187     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
188     for (i = 0; i < class->n_tables; i++) {
189         const struct ovsdb_idl_table_class *tc = &class->tables[i];
190         struct ovsdb_idl_table *table = &idl->tables[i];
191         size_t j;
192
193         shash_add_assert(&idl->table_by_name, tc->name, table);
194         table->class = tc;
195         table->modes = xmalloc(tc->n_columns);
196         memset(table->modes, default_mode, tc->n_columns);
197         table->need_table = false;
198         shash_init(&table->columns);
199         for (j = 0; j < tc->n_columns; j++) {
200             const struct ovsdb_idl_column *column = &tc->columns[j];
201
202             shash_add_assert(&table->columns, column->name, column);
203         }
204         hmap_init(&table->rows);
205         table->idl = idl;
206     }
207     idl->last_monitor_request_seqno = UINT_MAX;
208     hmap_init(&idl->outstanding_txns);
209
210     return idl;
211 }
212
213 /* Destroys 'idl' and all of the data structures that it manages. */
214 void
215 ovsdb_idl_destroy(struct ovsdb_idl *idl)
216 {
217     if (idl) {
218         size_t i;
219
220         ovs_assert(!idl->txn);
221         ovsdb_idl_clear(idl);
222         jsonrpc_session_close(idl->session);
223
224         for (i = 0; i < idl->class->n_tables; i++) {
225             struct ovsdb_idl_table *table = &idl->tables[i];
226             shash_destroy(&table->columns);
227             hmap_destroy(&table->rows);
228             free(table->modes);
229         }
230         shash_destroy(&idl->table_by_name);
231         free(idl->tables);
232         json_destroy(idl->monitor_request_id);
233         free(idl->lock_name);
234         json_destroy(idl->lock_request_id);
235         hmap_destroy(&idl->outstanding_txns);
236         free(idl);
237     }
238 }
239
240 static void
241 ovsdb_idl_clear(struct ovsdb_idl *idl)
242 {
243     bool changed = false;
244     size_t i;
245
246     for (i = 0; i < idl->class->n_tables; i++) {
247         struct ovsdb_idl_table *table = &idl->tables[i];
248         struct ovsdb_idl_row *row, *next_row;
249
250         if (hmap_is_empty(&table->rows)) {
251             continue;
252         }
253
254         changed = true;
255         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
256             struct ovsdb_idl_arc *arc, *next_arc;
257
258             if (!ovsdb_idl_row_is_orphan(row)) {
259                 ovsdb_idl_row_unparse(row);
260             }
261             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
262                 free(arc);
263             }
264             /* No need to do anything with dst_arcs: some node has those arcs
265              * as forward arcs and will destroy them itself. */
266
267             ovsdb_idl_row_destroy(row);
268         }
269     }
270
271     if (changed) {
272         idl->change_seqno++;
273     }
274 }
275
276 /* Processes a batch of messages from the database server on 'idl'.  This may
277  * cause the IDL's contents to change.  The client may check for that with
278  * ovsdb_idl_get_seqno(). */
279 void
280 ovsdb_idl_run(struct ovsdb_idl *idl)
281 {
282     int i;
283
284     ovs_assert(!idl->txn);
285     jsonrpc_session_run(idl->session);
286     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
287         struct jsonrpc_msg *msg;
288         unsigned int seqno;
289
290         seqno = jsonrpc_session_get_seqno(idl->session);
291         if (idl->last_monitor_request_seqno != seqno) {
292             idl->last_monitor_request_seqno = seqno;
293             ovsdb_idl_txn_abort_all(idl);
294             ovsdb_idl_send_monitor_request(idl);
295             if (idl->lock_name) {
296                 ovsdb_idl_send_lock_request(idl);
297             }
298             break;
299         }
300
301         msg = jsonrpc_session_recv(idl->session);
302         if (!msg) {
303             break;
304         }
305
306         if (msg->type == JSONRPC_NOTIFY
307             && !strcmp(msg->method, "update")
308             && msg->params->type == JSON_ARRAY
309             && msg->params->u.array.n == 2
310             && msg->params->u.array.elems[0]->type == JSON_NULL) {
311             /* Database contents changed. */
312             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
313         } else if (msg->type == JSONRPC_REPLY
314                    && idl->monitor_request_id
315                    && json_equal(idl->monitor_request_id, msg->id)) {
316             /* Reply to our "monitor" request. */
317             idl->change_seqno++;
318             json_destroy(idl->monitor_request_id);
319             idl->monitor_request_id = NULL;
320             ovsdb_idl_clear(idl);
321             ovsdb_idl_parse_update(idl, msg->result);
322         } else if (msg->type == JSONRPC_REPLY
323                    && idl->lock_request_id
324                    && json_equal(idl->lock_request_id, msg->id)) {
325             /* Reply to our "lock" request. */
326             ovsdb_idl_parse_lock_reply(idl, msg->result);
327         } else if (msg->type == JSONRPC_NOTIFY
328                    && !strcmp(msg->method, "locked")) {
329             /* We got our lock. */
330             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
331         } else if (msg->type == JSONRPC_NOTIFY
332                    && !strcmp(msg->method, "stolen")) {
333             /* Someone else stole our lock. */
334             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
335         } else if ((msg->type == JSONRPC_ERROR
336                     || msg->type == JSONRPC_REPLY)
337                    && ovsdb_idl_txn_process_reply(idl, msg)) {
338             /* ovsdb_idl_txn_process_reply() did everything needful. */
339         } else {
340             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
341              * a transaction before we receive the reply, so keep the log level
342              * low. */
343             VLOG_DBG("%s: received unexpected %s message",
344                      jsonrpc_session_get_name(idl->session),
345                      jsonrpc_msg_type_to_string(msg->type));
346         }
347         jsonrpc_msg_destroy(msg);
348     }
349 }
350
351 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
352  * do or when activity occurs on a transaction on 'idl'. */
353 void
354 ovsdb_idl_wait(struct ovsdb_idl *idl)
355 {
356     jsonrpc_session_wait(idl->session);
357     jsonrpc_session_recv_wait(idl->session);
358 }
359
360 /* Returns a "sequence number" that represents the state of 'idl'.  When
361  * ovsdb_idl_run() changes the database, the sequence number changes.  The
362  * initial fetch of the entire contents of the remote database is considered to
363  * be one kind of change.  Successfully acquiring a lock, if one has been
364  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
365  *
366  * As long as the sequence number does not change, the client may continue to
367  * use any data structures it obtains from 'idl'.  But when it changes, the
368  * client must not access any of these data structures again, because they
369  * could have freed or reused for other purposes.
370  *
371  * The sequence number can occasionally change even if the database does not.
372  * This happens if the connection to the database drops and reconnects, which
373  * causes the database contents to be reloaded even if they didn't change.  (It
374  * could also happen if the database server sends out a "change" that reflects
375  * what the IDL already thought was in the database.  The database server is
376  * not supposed to do that, but bugs could in theory cause it to do so.) */
377 unsigned int
378 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
379 {
380     return idl->change_seqno;
381 }
382
383 /* Returns true if 'idl' successfully connected to the remote database and
384  * retrieved its contents (even if the connection subsequently dropped and is
385  * in the process of reconnecting).  If so, then 'idl' contains an atomic
386  * snapshot of the database's contents (but it might be arbitrarily old if the
387  * connection dropped).
388  *
389  * Returns false if 'idl' has never connected or retrieved the database's
390  * contents.  If so, 'idl' is empty. */
391 bool
392 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
393 {
394     return ovsdb_idl_get_seqno(idl) != 0;
395 }
396
397 /* Reconfigures 'idl' so that it would reconnect to the database, if
398  * connection was dropped. */
399 void
400 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
401 {
402     jsonrpc_session_enable_reconnect(idl->session);
403 }
404
405 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
406  * meantime, the contents of 'idl' will not change. */
407 void
408 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
409 {
410     jsonrpc_session_force_reconnect(idl->session);
411 }
412
413 /* Some IDL users should only write to write-only columns.  Furthermore,
414  * writing to a column which is not write-only can cause serious performance
415  * degradations for these users.  This function causes 'idl' to reject writes
416  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
417 void
418 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
419 {
420     idl->verify_write_only = true;
421 }
422
423 bool
424 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
425 {
426     return jsonrpc_session_is_alive(idl->session);
427 }
428
429 int
430 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
431 {
432     return jsonrpc_session_get_last_error(idl->session);
433 }
434 \f
435 static unsigned char *
436 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
437                    const struct ovsdb_idl_column *column)
438 {
439     size_t i;
440
441     ovs_assert(!idl->change_seqno);
442
443     for (i = 0; i < idl->class->n_tables; i++) {
444         const struct ovsdb_idl_table *table = &idl->tables[i];
445         const struct ovsdb_idl_table_class *tc = table->class;
446
447         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
448             return &table->modes[column - tc->columns];
449         }
450     }
451
452     OVS_NOT_REACHED();
453 }
454
455 static void
456 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
457 {
458     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
459         struct ovsdb_idl_table *table;
460
461         table = shash_find_data(&idl->table_by_name,
462                                 base->u.uuid.refTableName);
463         if (table) {
464             table->need_table = true;
465         } else {
466             VLOG_WARN("%s IDL class missing referenced table %s",
467                       idl->class->database, base->u.uuid.refTableName);
468         }
469     }
470 }
471
472 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
473  * ensures that any tables referenced by 'column' will be replicated, even if
474  * no columns in that table are selected for replication (see
475  * ovsdb_idl_add_table() for more information).
476  *
477  * This function is only useful if 'monitor_everything_by_default' was false in
478  * the call to ovsdb_idl_create().  This function should be called between
479  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
480  */
481 void
482 ovsdb_idl_add_column(struct ovsdb_idl *idl,
483                      const struct ovsdb_idl_column *column)
484 {
485     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
486     add_ref_table(idl, &column->type.key);
487     add_ref_table(idl, &column->type.value);
488 }
489
490 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
491  * no columns are selected for replication.  This can be useful because it
492  * allows 'idl' to keep track of what rows in the table actually exist, which
493  * in turn allows columns that reference the table to have accurate contents.
494  * (The IDL presents the database with references to rows that do not exist
495  * removed.)
496  *
497  * This function is only useful if 'monitor_everything_by_default' was false in
498  * the call to ovsdb_idl_create().  This function should be called between
499  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
500  */
501 void
502 ovsdb_idl_add_table(struct ovsdb_idl *idl,
503                     const struct ovsdb_idl_table_class *tc)
504 {
505     size_t i;
506
507     for (i = 0; i < idl->class->n_tables; i++) {
508         struct ovsdb_idl_table *table = &idl->tables[i];
509
510         if (table->class == tc) {
511             table->need_table = true;
512             return;
513         }
514     }
515
516     OVS_NOT_REACHED();
517 }
518
519 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
520  *
521  * This function should be called between ovsdb_idl_create() and the first call
522  * to ovsdb_idl_run().
523  */
524 void
525 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
526                      const struct ovsdb_idl_column *column)
527 {
528     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
529 }
530
531 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
532  * OVSDB_IDL_MONITOR for details.
533  *
534  * This function should be called between ovsdb_idl_create() and the first call
535  * to ovsdb_idl_run().
536  */
537 void
538 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
539 {
540     *ovsdb_idl_get_mode(idl, column) = 0;
541 }
542 \f
543 static void
544 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
545 {
546     struct json *monitor_requests;
547     struct jsonrpc_msg *msg;
548     size_t i;
549
550     monitor_requests = json_object_create();
551     for (i = 0; i < idl->class->n_tables; i++) {
552         const struct ovsdb_idl_table *table = &idl->tables[i];
553         const struct ovsdb_idl_table_class *tc = table->class;
554         struct json *monitor_request, *columns;
555         size_t j;
556
557         columns = table->need_table ? json_array_create_empty() : NULL;
558         for (j = 0; j < tc->n_columns; j++) {
559             const struct ovsdb_idl_column *column = &tc->columns[j];
560             if (table->modes[j] & OVSDB_IDL_MONITOR) {
561                 if (!columns) {
562                     columns = json_array_create_empty();
563                 }
564                 json_array_add(columns, json_string_create(column->name));
565             }
566         }
567
568         if (columns) {
569             monitor_request = json_object_create();
570             json_object_put(monitor_request, "columns", columns);
571             json_object_put(monitor_requests, tc->name, monitor_request);
572         }
573     }
574
575     json_destroy(idl->monitor_request_id);
576     msg = jsonrpc_create_request(
577         "monitor",
578         json_array_create_3(json_string_create(idl->class->database),
579                             json_null_create(), monitor_requests),
580         &idl->monitor_request_id);
581     jsonrpc_session_send(idl->session, msg);
582 }
583
584 static void
585 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
586 {
587     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates);
588     if (error) {
589         if (!VLOG_DROP_WARN(&syntax_rl)) {
590             char *s = ovsdb_error_to_string(error);
591             VLOG_WARN_RL(&syntax_rl, "%s", s);
592             free(s);
593         }
594         ovsdb_error_destroy(error);
595     }
596 }
597
598 static struct ovsdb_error *
599 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
600                          const struct json *table_updates)
601 {
602     const struct shash_node *tables_node;
603
604     if (table_updates->type != JSON_OBJECT) {
605         return ovsdb_syntax_error(table_updates, NULL,
606                                   "<table-updates> is not an object");
607     }
608     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
609         const struct json *table_update = tables_node->data;
610         const struct shash_node *table_node;
611         struct ovsdb_idl_table *table;
612
613         table = shash_find_data(&idl->table_by_name, tables_node->name);
614         if (!table) {
615             return ovsdb_syntax_error(
616                 table_updates, NULL,
617                 "<table-updates> includes unknown table \"%s\"",
618                 tables_node->name);
619         }
620
621         if (table_update->type != JSON_OBJECT) {
622             return ovsdb_syntax_error(table_update, NULL,
623                                       "<table-update> for table \"%s\" is "
624                                       "not an object", table->class->name);
625         }
626         SHASH_FOR_EACH (table_node, json_object(table_update)) {
627             const struct json *row_update = table_node->data;
628             const struct json *old_json, *new_json;
629             struct uuid uuid;
630
631             if (!uuid_from_string(&uuid, table_node->name)) {
632                 return ovsdb_syntax_error(table_update, NULL,
633                                           "<table-update> for table \"%s\" "
634                                           "contains bad UUID "
635                                           "\"%s\" as member name",
636                                           table->class->name,
637                                           table_node->name);
638             }
639             if (row_update->type != JSON_OBJECT) {
640                 return ovsdb_syntax_error(row_update, NULL,
641                                           "<table-update> for table \"%s\" "
642                                           "contains <row-update> for %s that "
643                                           "is not an object",
644                                           table->class->name,
645                                           table_node->name);
646             }
647
648             old_json = shash_find_data(json_object(row_update), "old");
649             new_json = shash_find_data(json_object(row_update), "new");
650             if (old_json && old_json->type != JSON_OBJECT) {
651                 return ovsdb_syntax_error(old_json, NULL,
652                                           "\"old\" <row> is not object");
653             } else if (new_json && new_json->type != JSON_OBJECT) {
654                 return ovsdb_syntax_error(new_json, NULL,
655                                           "\"new\" <row> is not object");
656             } else if ((old_json != NULL) + (new_json != NULL)
657                        != shash_count(json_object(row_update))) {
658                 return ovsdb_syntax_error(row_update, NULL,
659                                           "<row-update> contains unexpected "
660                                           "member");
661             } else if (!old_json && !new_json) {
662                 return ovsdb_syntax_error(row_update, NULL,
663                                           "<row-update> missing \"old\" "
664                                           "and \"new\" members");
665             }
666
667             if (ovsdb_idl_process_update(table, &uuid, old_json, new_json)) {
668                 idl->change_seqno++;
669             }
670         }
671     }
672
673     return NULL;
674 }
675
676 static struct ovsdb_idl_row *
677 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
678 {
679     struct ovsdb_idl_row *row;
680
681     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
682         if (uuid_equals(&row->uuid, uuid)) {
683             return row;
684         }
685     }
686     return NULL;
687 }
688
689 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
690  * otherwise. */
691 static bool
692 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
693                          const struct uuid *uuid, const struct json *old,
694                          const struct json *new)
695 {
696     struct ovsdb_idl_row *row;
697
698     row = ovsdb_idl_get_row(table, uuid);
699     if (!new) {
700         /* Delete row. */
701         if (row && !ovsdb_idl_row_is_orphan(row)) {
702             /* XXX perhaps we should check the 'old' values? */
703             ovsdb_idl_delete_row(row);
704         } else {
705             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
706                          "from table %s",
707                          UUID_ARGS(uuid), table->class->name);
708             return false;
709         }
710     } else if (!old) {
711         /* Insert row. */
712         if (!row) {
713             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
714         } else if (ovsdb_idl_row_is_orphan(row)) {
715             ovsdb_idl_insert_row(row, new);
716         } else {
717             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
718                          "table %s", UUID_ARGS(uuid), table->class->name);
719             return ovsdb_idl_modify_row(row, new);
720         }
721     } else {
722         /* Modify row. */
723         if (row) {
724             /* XXX perhaps we should check the 'old' values? */
725             if (!ovsdb_idl_row_is_orphan(row)) {
726                 return ovsdb_idl_modify_row(row, new);
727             } else {
728                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
729                              "referenced row "UUID_FMT" in table %s",
730                              UUID_ARGS(uuid), table->class->name);
731                 ovsdb_idl_insert_row(row, new);
732             }
733         } else {
734             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
735                          "in table %s", UUID_ARGS(uuid), table->class->name);
736             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
737         }
738     }
739
740     return true;
741 }
742
743 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
744  * otherwise. */
745 static bool
746 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
747 {
748     struct ovsdb_idl_table *table = row->table;
749     struct shash_node *node;
750     bool changed = false;
751
752     SHASH_FOR_EACH (node, json_object(row_json)) {
753         const char *column_name = node->name;
754         const struct ovsdb_idl_column *column;
755         struct ovsdb_datum datum;
756         struct ovsdb_error *error;
757
758         column = shash_find_data(&table->columns, column_name);
759         if (!column) {
760             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
761                          column_name, UUID_ARGS(&row->uuid));
762             continue;
763         }
764
765         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
766         if (!error) {
767             unsigned int column_idx = column - table->class->columns;
768             struct ovsdb_datum *old = &row->old[column_idx];
769
770             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
771                 ovsdb_datum_swap(old, &datum);
772                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
773                     changed = true;
774                 }
775             } else {
776                 /* Didn't really change but the OVSDB monitor protocol always
777                  * includes every value in a row. */
778             }
779
780             ovsdb_datum_destroy(&datum, &column->type);
781         } else {
782             char *s = ovsdb_error_to_string(error);
783             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
784                          " in table %s: %s", column_name,
785                          UUID_ARGS(&row->uuid), table->class->name, s);
786             free(s);
787             ovsdb_error_destroy(error);
788         }
789     }
790     return changed;
791 }
792
793 /* When a row A refers to row B through a column with a "refTable" constraint,
794  * but row B does not exist, row B is called an "orphan row".  Orphan rows
795  * should not persist, because the database enforces referential integrity, but
796  * they can appear transiently as changes from the database are received (the
797  * database doesn't try to topologically sort them and circular references mean
798  * it isn't always possible anyhow).
799  *
800  * This function returns true if 'row' is an orphan row, otherwise false.
801  */
802 static bool
803 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
804 {
805     return !row->old && !row->new;
806 }
807
808 /* Returns true if 'row' is conceptually part of the database as modified by
809  * the current transaction (if any), false otherwise.
810  *
811  * This function will return true if 'row' is not an orphan (see the comment on
812  * ovsdb_idl_row_is_orphan()) and:
813  *
814  *   - 'row' exists in the database and has not been deleted within the
815  *     current transaction (if any).
816  *
817  *   - 'row' was inserted within the current transaction and has not been
818  *     deleted.  (In the latter case you should not have passed 'row' in at
819  *     all, because ovsdb_idl_txn_delete() freed it.)
820  *
821  * This function will return false if 'row' is an orphan or if 'row' was
822  * deleted within the current transaction.
823  */
824 static bool
825 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
826 {
827     return row->new != NULL;
828 }
829
830 static void
831 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
832 {
833     const struct ovsdb_idl_table_class *class = row->table->class;
834     size_t i;
835
836     for (i = 0; i < class->n_columns; i++) {
837         const struct ovsdb_idl_column *c = &class->columns[i];
838         (c->parse)(row, &row->old[i]);
839     }
840 }
841
842 static void
843 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
844 {
845     const struct ovsdb_idl_table_class *class = row->table->class;
846     size_t i;
847
848     for (i = 0; i < class->n_columns; i++) {
849         const struct ovsdb_idl_column *c = &class->columns[i];
850         (c->unparse)(row);
851     }
852 }
853
854 static void
855 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
856 {
857     ovs_assert(row->old == row->new);
858     if (!ovsdb_idl_row_is_orphan(row)) {
859         const struct ovsdb_idl_table_class *class = row->table->class;
860         size_t i;
861
862         for (i = 0; i < class->n_columns; i++) {
863             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
864         }
865         free(row->old);
866         row->old = row->new = NULL;
867     }
868 }
869
870 static void
871 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
872 {
873     if (row->old != row->new) {
874         if (row->new) {
875             const struct ovsdb_idl_table_class *class = row->table->class;
876             size_t i;
877
878             if (row->written) {
879                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
880                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
881                 }
882             }
883             free(row->new);
884             free(row->written);
885             row->written = NULL;
886         }
887         row->new = row->old;
888     }
889 }
890
891 static void
892 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
893 {
894     struct ovsdb_idl_arc *arc, *next;
895
896     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
897      * that this causes to be unreferenced. */
898     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
899         list_remove(&arc->dst_node);
900         if (destroy_dsts
901             && ovsdb_idl_row_is_orphan(arc->dst)
902             && list_is_empty(&arc->dst->dst_arcs)) {
903             ovsdb_idl_row_destroy(arc->dst);
904         }
905         free(arc);
906     }
907     list_init(&row->src_arcs);
908 }
909
910 /* Force nodes that reference 'row' to reparse. */
911 static void
912 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
913 {
914     struct ovsdb_idl_arc *arc, *next;
915
916     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
917      * 'arc', so we need to use the "safe" variant of list traversal.  However,
918      * calling an ovsdb_idl_column's 'parse' function will add an arc
919      * equivalent to 'arc' to row->arcs.  That could be a problem for
920      * traversal, but it adds it at the beginning of the list to prevent us
921      * from stumbling upon it again.
922      *
923      * (If duplicate arcs were possible then we would need to make sure that
924      * 'next' didn't also point into 'arc''s destination, but we forbid
925      * duplicate arcs.) */
926     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
927         struct ovsdb_idl_row *ref = arc->src;
928
929         ovsdb_idl_row_unparse(ref);
930         ovsdb_idl_row_clear_arcs(ref, false);
931         ovsdb_idl_row_parse(ref);
932     }
933 }
934
935 static struct ovsdb_idl_row *
936 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
937 {
938     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
939     class->row_init(row);
940     list_init(&row->src_arcs);
941     list_init(&row->dst_arcs);
942     hmap_node_nullify(&row->txn_node);
943     return row;
944 }
945
946 static struct ovsdb_idl_row *
947 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
948 {
949     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
950     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
951     row->uuid = *uuid;
952     row->table = table;
953     return row;
954 }
955
956 static void
957 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
958 {
959     if (row) {
960         ovsdb_idl_row_clear_old(row);
961         hmap_remove(&row->table->rows, &row->hmap_node);
962         free(row);
963     }
964 }
965
966 static void
967 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
968 {
969     const struct ovsdb_idl_table_class *class = row->table->class;
970     size_t i;
971
972     ovs_assert(!row->old && !row->new);
973     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
974     for (i = 0; i < class->n_columns; i++) {
975         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
976     }
977     ovsdb_idl_row_update(row, row_json);
978     ovsdb_idl_row_parse(row);
979
980     ovsdb_idl_row_reparse_backrefs(row);
981 }
982
983 static void
984 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
985 {
986     ovsdb_idl_row_unparse(row);
987     ovsdb_idl_row_clear_arcs(row, true);
988     ovsdb_idl_row_clear_old(row);
989     if (list_is_empty(&row->dst_arcs)) {
990         ovsdb_idl_row_destroy(row);
991     } else {
992         ovsdb_idl_row_reparse_backrefs(row);
993     }
994 }
995
996 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
997  * otherwise. */
998 static bool
999 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1000 {
1001     bool changed;
1002
1003     ovsdb_idl_row_unparse(row);
1004     ovsdb_idl_row_clear_arcs(row, true);
1005     changed = ovsdb_idl_row_update(row, row_json);
1006     ovsdb_idl_row_parse(row);
1007
1008     return changed;
1009 }
1010
1011 static bool
1012 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1013 {
1014     const struct ovsdb_idl_arc *arc;
1015
1016     /* No self-arcs. */
1017     if (src == dst) {
1018         return false;
1019     }
1020
1021     /* No duplicate arcs.
1022      *
1023      * We only need to test whether the first arc in dst->dst_arcs originates
1024      * at 'src', since we add all of the arcs from a given source in a clump
1025      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1026      * added at the front of the dst_arcs list. */
1027     if (list_is_empty(&dst->dst_arcs)) {
1028         return true;
1029     }
1030     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1031     return arc->src != src;
1032 }
1033
1034 static struct ovsdb_idl_table *
1035 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1036                            const struct ovsdb_idl_table_class *table_class)
1037 {
1038     return &idl->tables[table_class - idl->class->tables];
1039 }
1040
1041 /* Called by ovsdb-idlc generated code. */
1042 struct ovsdb_idl_row *
1043 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1044                       struct ovsdb_idl_table_class *dst_table_class,
1045                       const struct uuid *dst_uuid)
1046 {
1047     struct ovsdb_idl *idl = src->table->idl;
1048     struct ovsdb_idl_table *dst_table;
1049     struct ovsdb_idl_arc *arc;
1050     struct ovsdb_idl_row *dst;
1051
1052     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1053     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1054     if (idl->txn) {
1055         /* We're being called from ovsdb_idl_txn_write().  We must not update
1056          * any arcs, because the transaction will be backed out at commit or
1057          * abort time and we don't want our graph screwed up.
1058          *
1059          * Just return the destination row, if there is one and it has not been
1060          * deleted. */
1061         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1062             return dst;
1063         }
1064         return NULL;
1065     } else {
1066         /* We're being called from some other context.  Update the graph. */
1067         if (!dst) {
1068             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1069         }
1070
1071         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1072         if (may_add_arc(src, dst)) {
1073             /* The arc *must* be added at the front of the dst_arcs list.  See
1074              * ovsdb_idl_row_reparse_backrefs() for details. */
1075             arc = xmalloc(sizeof *arc);
1076             list_push_front(&src->src_arcs, &arc->src_node);
1077             list_push_front(&dst->dst_arcs, &arc->dst_node);
1078             arc->src = src;
1079             arc->dst = dst;
1080         }
1081
1082         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1083     }
1084 }
1085
1086 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1087  * pointer to the row if there is one, otherwise a null pointer.  */
1088 const struct ovsdb_idl_row *
1089 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1090                            const struct ovsdb_idl_table_class *tc,
1091                            const struct uuid *uuid)
1092 {
1093     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1094 }
1095
1096 static struct ovsdb_idl_row *
1097 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1098 {
1099     for (; node; node = hmap_next(&table->rows, node)) {
1100         struct ovsdb_idl_row *row;
1101
1102         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1103         if (ovsdb_idl_row_exists(row)) {
1104             return row;
1105         }
1106     }
1107     return NULL;
1108 }
1109
1110 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1111  * table is empty.
1112  *
1113  * Database tables are internally maintained as hash tables, so adding or
1114  * removing rows while traversing the same table can cause some rows to be
1115  * visited twice or not at apply. */
1116 const struct ovsdb_idl_row *
1117 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1118                     const struct ovsdb_idl_table_class *table_class)
1119 {
1120     struct ovsdb_idl_table *table
1121         = ovsdb_idl_table_from_class(idl, table_class);
1122     return next_real_row(table, hmap_first(&table->rows));
1123 }
1124
1125 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1126  * is the last row in its table. */
1127 const struct ovsdb_idl_row *
1128 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1129 {
1130     struct ovsdb_idl_table *table = row->table;
1131
1132     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1133 }
1134
1135 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1136  * transaction has changed 'column''s value, the modified value is returned.
1137  *
1138  * The caller must not modify or free the returned value.
1139  *
1140  * Various kinds of changes can invalidate the returned value: writing to the
1141  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1142  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1143  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1144  * returned value is needed for a long time, it is best to make a copy of it
1145  * with ovsdb_datum_clone(). */
1146 const struct ovsdb_datum *
1147 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1148                const struct ovsdb_idl_column *column)
1149 {
1150     const struct ovsdb_idl_table_class *class;
1151     size_t column_idx;
1152
1153     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1154
1155     class = row->table->class;
1156     column_idx = column - class->columns;
1157
1158     ovs_assert(row->new != NULL);
1159     ovs_assert(column_idx < class->n_columns);
1160
1161     if (row->written && bitmap_is_set(row->written, column_idx)) {
1162         return &row->new[column_idx];
1163     } else if (row->old) {
1164         return &row->old[column_idx];
1165     } else {
1166         return ovsdb_datum_default(&column->type);
1167     }
1168 }
1169
1170 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1171  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1172  * have a value type of OVSDB_TYPE_VOID.)
1173  *
1174  * This is useful in code that "knows" that a particular column has a given
1175  * type, so that it will abort if someone changes the column's type without
1176  * updating the code that uses it. */
1177 const struct ovsdb_datum *
1178 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1179               const struct ovsdb_idl_column *column,
1180               enum ovsdb_atomic_type key_type OVS_UNUSED,
1181               enum ovsdb_atomic_type value_type OVS_UNUSED)
1182 {
1183     ovs_assert(column->type.key.type == key_type);
1184     ovs_assert(column->type.value.type == value_type);
1185
1186     return ovsdb_idl_read(row, column);
1187 }
1188
1189 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1190  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1191  * then the return value is indeterminate. */
1192 bool
1193 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1194 {
1195     return row->table == NULL;
1196 }
1197 \f
1198 /* Transactions. */
1199
1200 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1201                                    enum ovsdb_idl_txn_status);
1202
1203 /* Returns a string representation of 'status'.  The caller must not modify or
1204  * free the returned string.
1205  *
1206  * The return value is probably useful only for debug log messages and unit
1207  * tests. */
1208 const char *
1209 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1210 {
1211     switch (status) {
1212     case TXN_UNCOMMITTED:
1213         return "uncommitted";
1214     case TXN_UNCHANGED:
1215         return "unchanged";
1216     case TXN_INCOMPLETE:
1217         return "incomplete";
1218     case TXN_ABORTED:
1219         return "aborted";
1220     case TXN_SUCCESS:
1221         return "success";
1222     case TXN_TRY_AGAIN:
1223         return "try again";
1224     case TXN_NOT_LOCKED:
1225         return "not locked";
1226     case TXN_ERROR:
1227         return "error";
1228     }
1229     return "<unknown>";
1230 }
1231
1232 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1233  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1234  * general information on transactions. */
1235 struct ovsdb_idl_txn *
1236 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1237 {
1238     struct ovsdb_idl_txn *txn;
1239
1240     ovs_assert(!idl->txn);
1241     idl->txn = txn = xmalloc(sizeof *txn);
1242     txn->request_id = NULL;
1243     txn->idl = idl;
1244     hmap_init(&txn->txn_rows);
1245     txn->status = TXN_UNCOMMITTED;
1246     txn->error = NULL;
1247     txn->dry_run = false;
1248     ds_init(&txn->comment);
1249
1250     txn->inc_table = NULL;
1251     txn->inc_column = NULL;
1252
1253     hmap_init(&txn->inserted_rows);
1254
1255     return txn;
1256 }
1257
1258 /* Appends 's', which is treated as a printf()-type format string, to the
1259  * comments that will be passed to the OVSDB server when 'txn' is committed.
1260  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1261  * show-log" can print in a relatively human-readable form.) */
1262 void
1263 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1264 {
1265     va_list args;
1266
1267     if (txn->comment.length) {
1268         ds_put_char(&txn->comment, '\n');
1269     }
1270
1271     va_start(args, s);
1272     ds_put_format_valist(&txn->comment, s, args);
1273     va_end(args);
1274 }
1275
1276 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1277  * almost every way, the transaction is treated like other transactions.  It
1278  * must be committed or aborted like other transactions, it will be sent to the
1279  * database server like other transactions, and so on.  The only difference is
1280  * that the operations sent to the database server will include, as the last
1281  * step, an "abort" operation, so that any changes made by the transaction will
1282  * not actually take effect. */
1283 void
1284 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1285 {
1286     txn->dry_run = true;
1287 }
1288
1289 /* Causes 'txn', when committed, to increment the value of 'column' within
1290  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1291  * successfully, the client may retrieve the final (incremented) value of
1292  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1293  *
1294  * The client could accomplish something similar with ovsdb_idl_read(),
1295  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
1296  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
1297  * will never (by itself) fail because of a verify error.
1298  *
1299  * The intended use is for incrementing the "next_cfg" column in the
1300  * Open_vSwitch table. */
1301 void
1302 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
1303                         const struct ovsdb_idl_row *row,
1304                         const struct ovsdb_idl_column *column)
1305 {
1306     ovs_assert(!txn->inc_table);
1307     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
1308     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
1309
1310     txn->inc_table = row->table->class->name;
1311     txn->inc_column = column->name;
1312     txn->inc_row = row->uuid;
1313 }
1314
1315 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
1316  * has been called for 'txn' but the commit is still incomplete (that is, the
1317  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
1318  * end up committing at the database server, but the client will not be able to
1319  * get any further status information back. */
1320 void
1321 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
1322 {
1323     struct ovsdb_idl_txn_insert *insert, *next;
1324
1325     json_destroy(txn->request_id);
1326     if (txn->status == TXN_INCOMPLETE) {
1327         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1328     }
1329     ovsdb_idl_txn_abort(txn);
1330     ds_destroy(&txn->comment);
1331     free(txn->error);
1332     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
1333         free(insert);
1334     }
1335     hmap_destroy(&txn->inserted_rows);
1336     free(txn);
1337 }
1338
1339 /* Causes poll_block() to wake up if 'txn' has completed committing. */
1340 void
1341 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
1342 {
1343     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
1344         poll_immediate_wake();
1345     }
1346 }
1347
1348 static struct json *
1349 where_uuid_equals(const struct uuid *uuid)
1350 {
1351     return
1352         json_array_create_1(
1353             json_array_create_3(
1354                 json_string_create("_uuid"),
1355                 json_string_create("=="),
1356                 json_array_create_2(
1357                     json_string_create("uuid"),
1358                     json_string_create_nocopy(
1359                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
1360 }
1361
1362 static char *
1363 uuid_name_from_uuid(const struct uuid *uuid)
1364 {
1365     char *name;
1366     char *p;
1367
1368     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
1369     for (p = name; *p != '\0'; p++) {
1370         if (*p == '-') {
1371             *p = '_';
1372         }
1373     }
1374
1375     return name;
1376 }
1377
1378 static const struct ovsdb_idl_row *
1379 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
1380 {
1381     const struct ovsdb_idl_row *row;
1382
1383     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
1384         if (uuid_equals(&row->uuid, uuid)) {
1385             return row;
1386         }
1387     }
1388     return NULL;
1389 }
1390
1391 /* XXX there must be a cleaner way to do this */
1392 static struct json *
1393 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
1394 {
1395     if (json->type == JSON_ARRAY) {
1396         struct uuid uuid;
1397         size_t i;
1398
1399         if (json->u.array.n == 2
1400             && json->u.array.elems[0]->type == JSON_STRING
1401             && json->u.array.elems[1]->type == JSON_STRING
1402             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
1403             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
1404             const struct ovsdb_idl_row *row;
1405
1406             row = ovsdb_idl_txn_get_row(txn, &uuid);
1407             if (row && !row->old && row->new) {
1408                 json_destroy(json);
1409
1410                 return json_array_create_2(
1411                     json_string_create("named-uuid"),
1412                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
1413             }
1414         }
1415
1416         for (i = 0; i < json->u.array.n; i++) {
1417             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
1418                                                       txn);
1419         }
1420     } else if (json->type == JSON_OBJECT) {
1421         struct shash_node *node;
1422
1423         SHASH_FOR_EACH (node, json_object(json)) {
1424             node->data = substitute_uuids(node->data, txn);
1425         }
1426     }
1427     return json;
1428 }
1429
1430 static void
1431 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
1432 {
1433     struct ovsdb_idl_row *row, *next;
1434
1435     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
1436      * ovsdb_idl_column's 'parse' function, which will call
1437      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
1438      * transaction and fail to update the graph.  */
1439     txn->idl->txn = NULL;
1440
1441     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
1442         if (row->old) {
1443             if (row->written) {
1444                 ovsdb_idl_row_unparse(row);
1445                 ovsdb_idl_row_clear_arcs(row, false);
1446                 ovsdb_idl_row_parse(row);
1447             }
1448         } else {
1449             ovsdb_idl_row_unparse(row);
1450         }
1451         ovsdb_idl_row_clear_new(row);
1452
1453         free(row->prereqs);
1454         row->prereqs = NULL;
1455
1456         free(row->written);
1457         row->written = NULL;
1458
1459         hmap_remove(&txn->txn_rows, &row->txn_node);
1460         hmap_node_nullify(&row->txn_node);
1461         if (!row->old) {
1462             hmap_remove(&row->table->rows, &row->hmap_node);
1463             free(row);
1464         }
1465     }
1466     hmap_destroy(&txn->txn_rows);
1467     hmap_init(&txn->txn_rows);
1468 }
1469
1470 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
1471  * of the following TXN_* constants:
1472  *
1473  *   TXN_INCOMPLETE:
1474  *
1475  *       The transaction is in progress, but not yet complete.  The caller
1476  *       should call again later, after calling ovsdb_idl_run() to let the IDL
1477  *       do OVSDB protocol processing.
1478  *
1479  *   TXN_UNCHANGED:
1480  *
1481  *       The transaction is complete.  (It didn't actually change the database,
1482  *       so the IDL didn't send any request to the database server.)
1483  *
1484  *   TXN_ABORTED:
1485  *
1486  *       The caller previously called ovsdb_idl_txn_abort().
1487  *
1488  *   TXN_SUCCESS:
1489  *
1490  *       The transaction was successful.  The update made by the transaction
1491  *       (and possibly other changes made by other database clients) should
1492  *       already be visible in the IDL.
1493  *
1494  *   TXN_TRY_AGAIN:
1495  *
1496  *       The transaction failed for some transient reason, e.g. because a
1497  *       "verify" operation reported an inconsistency or due to a network
1498  *       problem.  The caller should wait for a change to the database, then
1499  *       compose a new transaction, and commit the new transaction.
1500  *
1501  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
1502  *       the database.  It is important to use its return value *before* the
1503  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
1504  *       purpose, because the change that one should wait for can happen after
1505  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
1506  *       using some other baseline value in that situation could cause an
1507  *       indefinite wait if the database rarely changes.
1508  *
1509  *   TXN_NOT_LOCKED:
1510  *
1511  *       The transaction failed because the IDL has been configured to require
1512  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
1513  *       has already lost it.
1514  *
1515  * Committing a transaction rolls back all of the changes that it made to the
1516  * IDL's copy of the database.  If the transaction commits successfully, then
1517  * the database server will send an update and, thus, the IDL will be updated
1518  * with the committed changes. */
1519 enum ovsdb_idl_txn_status
1520 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
1521 {
1522     struct ovsdb_idl_row *row;
1523     struct json *operations;
1524     bool any_updates;
1525
1526     if (txn != txn->idl->txn) {
1527         return txn->status;
1528     }
1529
1530     /* If we need a lock but don't have it, give up quickly. */
1531     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
1532         txn->status = TXN_NOT_LOCKED;
1533         ovsdb_idl_txn_disassemble(txn);
1534         return txn->status;
1535     }
1536
1537     operations = json_array_create_1(
1538         json_string_create(txn->idl->class->database));
1539
1540     /* Assert that we have the required lock (avoiding a race). */
1541     if (txn->idl->lock_name) {
1542         struct json *op = json_object_create();
1543         json_array_add(operations, op);
1544         json_object_put_string(op, "op", "assert");
1545         json_object_put_string(op, "lock", txn->idl->lock_name);
1546     }
1547
1548     /* Add prerequisites and declarations of new rows. */
1549     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
1550         /* XXX check that deleted rows exist even if no prereqs? */
1551         if (row->prereqs) {
1552             const struct ovsdb_idl_table_class *class = row->table->class;
1553             size_t n_columns = class->n_columns;
1554             struct json *op, *columns, *row_json;
1555             size_t idx;
1556
1557             op = json_object_create();
1558             json_array_add(operations, op);
1559             json_object_put_string(op, "op", "wait");
1560             json_object_put_string(op, "table", class->name);
1561             json_object_put(op, "timeout", json_integer_create(0));
1562             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1563             json_object_put_string(op, "until", "==");
1564             columns = json_array_create_empty();
1565             json_object_put(op, "columns", columns);
1566             row_json = json_object_create();
1567             json_object_put(op, "rows", json_array_create_1(row_json));
1568
1569             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
1570                 const struct ovsdb_idl_column *column = &class->columns[idx];
1571                 json_array_add(columns, json_string_create(column->name));
1572                 json_object_put(row_json, column->name,
1573                                 ovsdb_datum_to_json(&row->old[idx],
1574                                                     &column->type));
1575             }
1576         }
1577     }
1578
1579     /* Add updates. */
1580     any_updates = false;
1581     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
1582         const struct ovsdb_idl_table_class *class = row->table->class;
1583
1584         if (!row->new) {
1585             if (class->is_root) {
1586                 struct json *op = json_object_create();
1587                 json_object_put_string(op, "op", "delete");
1588                 json_object_put_string(op, "table", class->name);
1589                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1590                 json_array_add(operations, op);
1591                 any_updates = true;
1592             } else {
1593                 /* Let ovsdb-server decide whether to really delete it. */
1594             }
1595         } else if (row->old != row->new) {
1596             struct json *row_json;
1597             struct json *op;
1598             size_t idx;
1599
1600             op = json_object_create();
1601             json_object_put_string(op, "op", row->old ? "update" : "insert");
1602             json_object_put_string(op, "table", class->name);
1603             if (row->old) {
1604                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1605             } else {
1606                 struct ovsdb_idl_txn_insert *insert;
1607
1608                 any_updates = true;
1609
1610                 json_object_put(op, "uuid-name",
1611                                 json_string_create_nocopy(
1612                                     uuid_name_from_uuid(&row->uuid)));
1613
1614                 insert = xmalloc(sizeof *insert);
1615                 insert->dummy = row->uuid;
1616                 insert->op_index = operations->u.array.n - 1;
1617                 uuid_zero(&insert->real);
1618                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
1619                             uuid_hash(&insert->dummy));
1620             }
1621             row_json = json_object_create();
1622             json_object_put(op, "row", row_json);
1623
1624             if (row->written) {
1625                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
1626                     const struct ovsdb_idl_column *column =
1627                                                         &class->columns[idx];
1628
1629                     if (row->old
1630                         || !ovsdb_datum_is_default(&row->new[idx],
1631                                                   &column->type)) {
1632                         json_object_put(row_json, column->name,
1633                                         substitute_uuids(
1634                                             ovsdb_datum_to_json(&row->new[idx],
1635                                                                 &column->type),
1636                                             txn));
1637
1638                         /* If anything really changed, consider it an update.
1639                          * We can't suppress not-really-changed values earlier
1640                          * or transactions would become nonatomic (see the big
1641                          * comment inside ovsdb_idl_txn_write()). */
1642                         if (!any_updates && row->old &&
1643                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
1644                                                 &column->type)) {
1645                             any_updates = true;
1646                         }
1647                     }
1648                 }
1649             }
1650
1651             if (!row->old || !shash_is_empty(json_object(row_json))) {
1652                 json_array_add(operations, op);
1653             } else {
1654                 json_destroy(op);
1655             }
1656         }
1657     }
1658
1659     /* Add increment. */
1660     if (txn->inc_table && any_updates) {
1661         struct json *op;
1662
1663         txn->inc_index = operations->u.array.n - 1;
1664
1665         op = json_object_create();
1666         json_object_put_string(op, "op", "mutate");
1667         json_object_put_string(op, "table", txn->inc_table);
1668         json_object_put(op, "where",
1669                         substitute_uuids(where_uuid_equals(&txn->inc_row),
1670                                          txn));
1671         json_object_put(op, "mutations",
1672                         json_array_create_1(
1673                             json_array_create_3(
1674                                 json_string_create(txn->inc_column),
1675                                 json_string_create("+="),
1676                                 json_integer_create(1))));
1677         json_array_add(operations, op);
1678
1679         op = json_object_create();
1680         json_object_put_string(op, "op", "select");
1681         json_object_put_string(op, "table", txn->inc_table);
1682         json_object_put(op, "where",
1683                         substitute_uuids(where_uuid_equals(&txn->inc_row),
1684                                          txn));
1685         json_object_put(op, "columns",
1686                         json_array_create_1(json_string_create(
1687                                                 txn->inc_column)));
1688         json_array_add(operations, op);
1689     }
1690
1691     if (txn->comment.length) {
1692         struct json *op = json_object_create();
1693         json_object_put_string(op, "op", "comment");
1694         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
1695         json_array_add(operations, op);
1696     }
1697
1698     if (txn->dry_run) {
1699         struct json *op = json_object_create();
1700         json_object_put_string(op, "op", "abort");
1701         json_array_add(operations, op);
1702     }
1703
1704     if (!any_updates) {
1705         txn->status = TXN_UNCHANGED;
1706         json_destroy(operations);
1707     } else if (!jsonrpc_session_send(
1708                    txn->idl->session,
1709                    jsonrpc_create_request(
1710                        "transact", operations, &txn->request_id))) {
1711         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1712                     json_hash(txn->request_id, 0));
1713         txn->status = TXN_INCOMPLETE;
1714     } else {
1715         txn->status = TXN_TRY_AGAIN;
1716     }
1717
1718     ovsdb_idl_txn_disassemble(txn);
1719     return txn->status;
1720 }
1721
1722 /* Attempts to commit 'txn', blocking until the commit either succeeds or
1723  * fails.  Returns the final commit status, which may be any TXN_* value other
1724  * than TXN_INCOMPLETE.
1725  *
1726  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
1727  * return value of ovsdb_idl_get_seqno() to change. */
1728 enum ovsdb_idl_txn_status
1729 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
1730 {
1731     enum ovsdb_idl_txn_status status;
1732
1733     fatal_signal_run();
1734     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
1735         ovsdb_idl_run(txn->idl);
1736         ovsdb_idl_wait(txn->idl);
1737         ovsdb_idl_txn_wait(txn);
1738         poll_block();
1739     }
1740     return status;
1741 }
1742
1743 /* Returns the final (incremented) value of the column in 'txn' that was set to
1744  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
1745  * successfully. */
1746 int64_t
1747 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
1748 {
1749     ovs_assert(txn->status == TXN_SUCCESS);
1750     return txn->inc_new_value;
1751 }
1752
1753 /* Aborts 'txn' without sending it to the database server.  This is effective
1754  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
1755  * Otherwise, it has no effect.
1756  *
1757  * Aborting a transaction doesn't free its memory.  Use
1758  * ovsdb_idl_txn_destroy() to do that. */
1759 void
1760 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1761 {
1762     ovsdb_idl_txn_disassemble(txn);
1763     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
1764         txn->status = TXN_ABORTED;
1765     }
1766 }
1767
1768 /* Returns a string that reports the error status for 'txn'.  The caller must
1769  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
1770  * for 'txn' may free the returned string.
1771  *
1772  * The return value is ordinarily one of the strings that
1773  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
1774  * due to an error reported by the database server, the return value is that
1775  * error. */
1776 const char *
1777 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
1778 {
1779     if (txn->status != TXN_ERROR) {
1780         return ovsdb_idl_txn_status_to_string(txn->status);
1781     } else if (txn->error) {
1782         return txn->error;
1783     } else {
1784         return "no error details available";
1785     }
1786 }
1787
1788 static void
1789 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
1790                              const struct json *json)
1791 {
1792     if (txn->error == NULL) {
1793         txn->error = json_to_string(json, JSSF_SORT);
1794     }
1795 }
1796
1797 /* For transaction 'txn' that completed successfully, finds and returns the
1798  * permanent UUID that the database assigned to a newly inserted row, given the
1799  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
1800  *
1801  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
1802  * if it was assigned by that function and then deleted by
1803  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
1804  * and then deleted within a single transaction are never sent to the database
1805  * server, so it never assigns them a permanent UUID.) */
1806 const struct uuid *
1807 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
1808                               const struct uuid *uuid)
1809 {
1810     const struct ovsdb_idl_txn_insert *insert;
1811
1812     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
1813     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
1814                              uuid_hash(uuid), &txn->inserted_rows) {
1815         if (uuid_equals(uuid, &insert->dummy)) {
1816             return &insert->real;
1817         }
1818     }
1819     return NULL;
1820 }
1821
1822 static void
1823 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1824                        enum ovsdb_idl_txn_status status)
1825 {
1826     txn->status = status;
1827     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1828 }
1829
1830 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
1831  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
1832  * ovs-vswitchd).
1833  *
1834  * 'datum' must have the correct type for its column.  The IDL does not check
1835  * that it meets schema constraints, but ovsdb-server will do so at commit time
1836  * so it had better be correct.
1837  *
1838  * A transaction must be in progress.  Replication of 'column' must not have
1839  * been disabled (by calling ovsdb_idl_omit()).
1840  *
1841  * Usually this function is used indirectly through one of the "set" functions
1842  * generated by ovsdb-idlc.
1843  *
1844  * Takes ownership of what 'datum' points to (and in some cases destroys that
1845  * data before returning) but makes a copy of 'datum' itself.  (Commonly
1846  * 'datum' is on the caller's stack.) */
1847 static void
1848 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
1849                       const struct ovsdb_idl_column *column,
1850                       struct ovsdb_datum *datum, bool owns_datum)
1851 {
1852     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
1853     const struct ovsdb_idl_table_class *class;
1854     size_t column_idx;
1855     bool write_only;
1856
1857     if (ovsdb_idl_row_is_synthetic(row)) {
1858         goto discard_datum;
1859     }
1860
1861     class = row->table->class;
1862     column_idx = column - class->columns;
1863     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
1864
1865     ovs_assert(row->new != NULL);
1866     ovs_assert(column_idx < class->n_columns);
1867     ovs_assert(row->old == NULL ||
1868                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
1869
1870     if (row->table->idl->verify_write_only && !write_only) {
1871         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
1872                  " explicitly configured not to.", class->name, column->name);
1873         goto discard_datum;
1874     }
1875
1876     /* If this is a write-only column and the datum being written is the same
1877      * as the one already there, just skip the update entirely.  This is worth
1878      * optimizing because we have a lot of columns that get periodically
1879      * refreshed into the database but don't actually change that often.
1880      *
1881      * We don't do this for read/write columns because that would break
1882      * atomicity of transactions--some other client might have written a
1883      * different value in that column since we read it.  (But if a whole
1884      * transaction only does writes of existing values, without making any real
1885      * changes, we will drop the whole transaction later in
1886      * ovsdb_idl_txn_commit().) */
1887     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
1888                                          datum, &column->type)) {
1889         goto discard_datum;
1890     }
1891
1892     if (hmap_node_is_null(&row->txn_node)) {
1893         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1894                     uuid_hash(&row->uuid));
1895     }
1896     if (row->old == row->new) {
1897         row->new = xmalloc(class->n_columns * sizeof *row->new);
1898     }
1899     if (!row->written) {
1900         row->written = bitmap_allocate(class->n_columns);
1901     }
1902     if (bitmap_is_set(row->written, column_idx)) {
1903         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1904     } else {
1905         bitmap_set1(row->written, column_idx);
1906     }
1907     if (owns_datum) {
1908         row->new[column_idx] = *datum;
1909     } else {
1910         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
1911     }
1912     (column->unparse)(row);
1913     (column->parse)(row, &row->new[column_idx]);
1914     return;
1915
1916 discard_datum:
1917     if (owns_datum) {
1918         ovsdb_datum_destroy(datum, &column->type);
1919     }
1920 }
1921
1922 void
1923 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
1924                     const struct ovsdb_idl_column *column,
1925                     struct ovsdb_datum *datum)
1926 {
1927     ovsdb_idl_txn_write__(row, column, datum, true);
1928 }
1929
1930 void
1931 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
1932                           const struct ovsdb_idl_column *column,
1933                           const struct ovsdb_datum *datum)
1934 {
1935     ovsdb_idl_txn_write__(row, column,
1936                           CONST_CAST(struct ovsdb_datum *, datum), false);
1937 }
1938
1939 /* Causes the original contents of 'column' in 'row_' to be verified as a
1940  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
1941  * changed (or if 'row_' was deleted) between the time that the IDL originally
1942  * read its contents and the time that the transaction commits, then the
1943  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
1944  * TXN_AGAIN_NOW (depending on whether the database change has already been
1945  * received).
1946  *
1947  * The intention is that, to ensure that no transaction commits based on dirty
1948  * reads, an application should call ovsdb_idl_txn_verify() on each data item
1949  * read as part of a read-modify-write operation.
1950  *
1951  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
1952  * value of 'column' is already known:
1953  *
1954  *   - If 'row_' is a row created by the current transaction (returned by
1955  *     ovsdb_idl_txn_insert()).
1956  *
1957  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
1958  *     within the current transaction.
1959  *
1960  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
1961  * ovsdb_idl_txn_write() for a given read-modify-write.
1962  *
1963  * A transaction must be in progress.
1964  *
1965  * Usually this function is used indirectly through one of the "verify"
1966  * functions generated by ovsdb-idlc. */
1967 void
1968 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1969                      const struct ovsdb_idl_column *column)
1970 {
1971     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
1972     const struct ovsdb_idl_table_class *class;
1973     size_t column_idx;
1974
1975     if (ovsdb_idl_row_is_synthetic(row)) {
1976         return;
1977     }
1978
1979     class = row->table->class;
1980     column_idx = column - class->columns;
1981
1982     ovs_assert(row->new != NULL);
1983     ovs_assert(row->old == NULL ||
1984                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
1985     if (!row->old
1986         || (row->written && bitmap_is_set(row->written, column_idx))) {
1987         return;
1988     }
1989
1990     if (hmap_node_is_null(&row->txn_node)) {
1991         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1992                     uuid_hash(&row->uuid));
1993     }
1994     if (!row->prereqs) {
1995         row->prereqs = bitmap_allocate(class->n_columns);
1996     }
1997     bitmap_set1(row->prereqs, column_idx);
1998 }
1999
2000 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2001  * accessed afterward.
2002  *
2003  * A transaction must be in progress.
2004  *
2005  * Usually this function is used indirectly through one of the "delete"
2006  * functions generated by ovsdb-idlc. */
2007 void
2008 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2009 {
2010     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2011
2012     if (ovsdb_idl_row_is_synthetic(row)) {
2013         return;
2014     }
2015
2016     ovs_assert(row->new != NULL);
2017     if (!row->old) {
2018         ovsdb_idl_row_unparse(row);
2019         ovsdb_idl_row_clear_new(row);
2020         ovs_assert(!row->prereqs);
2021         hmap_remove(&row->table->rows, &row->hmap_node);
2022         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2023         free(row);
2024         return;
2025     }
2026     if (hmap_node_is_null(&row->txn_node)) {
2027         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2028                     uuid_hash(&row->uuid));
2029     }
2030     ovsdb_idl_row_clear_new(row);
2031     row->new = NULL;
2032 }
2033
2034 /* Inserts and returns a new row in the table with the specified 'class' in the
2035  * database with open transaction 'txn'.
2036  *
2037  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2038  * randomly generated; otherwise 'uuid' should specify a randomly generated
2039  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2040  * 'txn' is committed, but the IDL will replace any uses of the provisional
2041  * UUID in the data to be to be committed by the UUID assigned by
2042  * ovsdb-server.
2043  *
2044  * Usually this function is used indirectly through one of the "insert"
2045  * functions generated by ovsdb-idlc. */
2046 const struct ovsdb_idl_row *
2047 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2048                      const struct ovsdb_idl_table_class *class,
2049                      const struct uuid *uuid)
2050 {
2051     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2052
2053     if (uuid) {
2054         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2055         row->uuid = *uuid;
2056     } else {
2057         uuid_generate(&row->uuid);
2058     }
2059
2060     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2061     row->new = xmalloc(class->n_columns * sizeof *row->new);
2062     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2063     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2064     return row;
2065 }
2066
2067 static void
2068 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2069 {
2070     struct ovsdb_idl_txn *txn;
2071
2072     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2073         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2074     }
2075 }
2076
2077 static struct ovsdb_idl_txn *
2078 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2079 {
2080     struct ovsdb_idl_txn *txn;
2081
2082     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2083                              json_hash(id, 0), &idl->outstanding_txns) {
2084         if (json_equal(id, txn->request_id)) {
2085             return txn;
2086         }
2087     }
2088     return NULL;
2089 }
2090
2091 static bool
2092 check_json_type(const struct json *json, enum json_type type, const char *name)
2093 {
2094     if (!json) {
2095         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2096         return false;
2097     } else if (json->type != type) {
2098         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2099                      name, json_type_to_string(json->type),
2100                      json_type_to_string(type));
2101         return false;
2102     } else {
2103         return true;
2104     }
2105 }
2106
2107 static bool
2108 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2109                                 const struct json_array *results)
2110 {
2111     struct json *count, *rows, *row, *column;
2112     struct shash *mutate, *select;
2113
2114     if (txn->inc_index + 2 > results->n) {
2115         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2116                      "for increment (has %"PRIuSIZE", needs %u)",
2117                      results->n, txn->inc_index + 2);
2118         return false;
2119     }
2120
2121     /* We know that this is a JSON object because the loop in
2122      * ovsdb_idl_txn_process_reply() checked. */
2123     mutate = json_object(results->elems[txn->inc_index]);
2124     count = shash_find_data(mutate, "count");
2125     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2126         return false;
2127     }
2128     if (count->u.integer != 1) {
2129         VLOG_WARN_RL(&syntax_rl,
2130                      "\"mutate\" reply \"count\" is %lld instead of 1",
2131                      count->u.integer);
2132         return false;
2133     }
2134
2135     select = json_object(results->elems[txn->inc_index + 1]);
2136     rows = shash_find_data(select, "rows");
2137     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2138         return false;
2139     }
2140     if (rows->u.array.n != 1) {
2141         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2142                      "instead of 1",
2143                      rows->u.array.n);
2144         return false;
2145     }
2146     row = rows->u.array.elems[0];
2147     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2148         return false;
2149     }
2150     column = shash_find_data(json_object(row), txn->inc_column);
2151     if (!check_json_type(column, JSON_INTEGER,
2152                          "\"select\" reply inc column")) {
2153         return false;
2154     }
2155     txn->inc_new_value = column->u.integer;
2156     return true;
2157 }
2158
2159 static bool
2160 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2161                                    const struct json_array *results)
2162 {
2163     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2164     struct ovsdb_error *error;
2165     struct json *json_uuid;
2166     union ovsdb_atom uuid;
2167     struct shash *reply;
2168
2169     if (insert->op_index >= results->n) {
2170         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2171                      "for insert (has %"PRIuSIZE", needs %u)",
2172                      results->n, insert->op_index);
2173         return false;
2174     }
2175
2176     /* We know that this is a JSON object because the loop in
2177      * ovsdb_idl_txn_process_reply() checked. */
2178     reply = json_object(results->elems[insert->op_index]);
2179     json_uuid = shash_find_data(reply, "uuid");
2180     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2181         return false;
2182     }
2183
2184     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2185     if (error) {
2186         char *s = ovsdb_error_to_string(error);
2187         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2188                      "UUID: %s", s);
2189         free(s);
2190         ovsdb_error_destroy(error);
2191         return false;
2192     }
2193
2194     insert->real = uuid.uuid;
2195
2196     return true;
2197 }
2198
2199 static bool
2200 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2201                             const struct jsonrpc_msg *msg)
2202 {
2203     struct ovsdb_idl_txn *txn;
2204     enum ovsdb_idl_txn_status status;
2205
2206     txn = ovsdb_idl_txn_find(idl, msg->id);
2207     if (!txn) {
2208         return false;
2209     }
2210
2211     if (msg->type == JSONRPC_ERROR) {
2212         status = TXN_ERROR;
2213     } else if (msg->result->type != JSON_ARRAY) {
2214         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2215         status = TXN_ERROR;
2216     } else {
2217         struct json_array *ops = &msg->result->u.array;
2218         int hard_errors = 0;
2219         int soft_errors = 0;
2220         int lock_errors = 0;
2221         size_t i;
2222
2223         for (i = 0; i < ops->n; i++) {
2224             struct json *op = ops->elems[i];
2225
2226             if (op->type == JSON_NULL) {
2227                 /* This isn't an error in itself but indicates that some prior
2228                  * operation failed, so make sure that we know about it. */
2229                 soft_errors++;
2230             } else if (op->type == JSON_OBJECT) {
2231                 struct json *error;
2232
2233                 error = shash_find_data(json_object(op), "error");
2234                 if (error) {
2235                     if (error->type == JSON_STRING) {
2236                         if (!strcmp(error->u.string, "timed out")) {
2237                             soft_errors++;
2238                         } else if (!strcmp(error->u.string, "not owner")) {
2239                             lock_errors++;
2240                         } else if (strcmp(error->u.string, "aborted")) {
2241                             hard_errors++;
2242                             ovsdb_idl_txn_set_error_json(txn, op);
2243                         }
2244                     } else {
2245                         hard_errors++;
2246                         ovsdb_idl_txn_set_error_json(txn, op);
2247                         VLOG_WARN_RL(&syntax_rl,
2248                                      "\"error\" in reply is not JSON string");
2249                     }
2250                 }
2251             } else {
2252                 hard_errors++;
2253                 ovsdb_idl_txn_set_error_json(txn, op);
2254                 VLOG_WARN_RL(&syntax_rl,
2255                              "operation reply is not JSON null or object");
2256             }
2257         }
2258
2259         if (!soft_errors && !hard_errors && !lock_errors) {
2260             struct ovsdb_idl_txn_insert *insert;
2261
2262             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2263                 hard_errors++;
2264             }
2265
2266             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2267                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2268                     hard_errors++;
2269                 }
2270             }
2271         }
2272
2273         status = (hard_errors ? TXN_ERROR
2274                   : lock_errors ? TXN_NOT_LOCKED
2275                   : soft_errors ? TXN_TRY_AGAIN
2276                   : TXN_SUCCESS);
2277     }
2278
2279     ovsdb_idl_txn_complete(txn, status);
2280     return true;
2281 }
2282
2283 /* Returns the transaction currently active for 'row''s IDL.  A transaction
2284  * must currently be active. */
2285 struct ovsdb_idl_txn *
2286 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
2287 {
2288     struct ovsdb_idl_txn *txn = row->table->idl->txn;
2289     ovs_assert(txn != NULL);
2290     return txn;
2291 }
2292
2293 /* Returns the IDL on which 'txn' acts. */
2294 struct ovsdb_idl *
2295 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
2296 {
2297     return txn->idl;
2298 }
2299 \f
2300 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
2301  * the database server and to avoid modifying the database when the lock cannot
2302  * be acquired (that is, when another client has the same lock).
2303  *
2304  * If 'lock_name' is NULL, drops the locking requirement and releases the
2305  * lock. */
2306 void
2307 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
2308 {
2309     ovs_assert(!idl->txn);
2310     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
2311
2312     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
2313         /* Release previous lock. */
2314         ovsdb_idl_send_unlock_request(idl);
2315         free(idl->lock_name);
2316         idl->lock_name = NULL;
2317         idl->is_lock_contended = false;
2318     }
2319
2320     if (lock_name && !idl->lock_name) {
2321         /* Acquire new lock. */
2322         idl->lock_name = xstrdup(lock_name);
2323         ovsdb_idl_send_lock_request(idl);
2324     }
2325 }
2326
2327 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
2328  *
2329  * Locking and unlocking happens asynchronously from the database client's
2330  * point of view, so the information is only useful for optimization (e.g. if
2331  * the client doesn't have the lock then there's no point in trying to write to
2332  * the database). */
2333 bool
2334 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
2335 {
2336     return idl->has_lock;
2337 }
2338
2339 /* Returns true if 'idl' is configured to obtain a lock but the database server
2340  * has indicated that some other client already owns the requested lock. */
2341 bool
2342 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
2343 {
2344     return idl->is_lock_contended;
2345 }
2346
2347 static void
2348 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
2349 {
2350     if (new_has_lock && !idl->has_lock) {
2351         if (!idl->monitor_request_id) {
2352             idl->change_seqno++;
2353         } else {
2354             /* We're waiting for a monitor reply, so don't signal that the
2355              * database changed.  The monitor reply will increment change_seqno
2356              * anyhow. */
2357         }
2358         idl->is_lock_contended = false;
2359     }
2360     idl->has_lock = new_has_lock;
2361 }
2362
2363 static void
2364 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
2365                               struct json **idp)
2366 {
2367     ovsdb_idl_update_has_lock(idl, false);
2368
2369     json_destroy(idl->lock_request_id);
2370     idl->lock_request_id = NULL;
2371
2372     if (jsonrpc_session_is_connected(idl->session)) {
2373         struct json *params;
2374
2375         params = json_array_create_1(json_string_create(idl->lock_name));
2376         jsonrpc_session_send(idl->session,
2377                              jsonrpc_create_request(method, params, idp));
2378     }
2379 }
2380
2381 static void
2382 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
2383 {
2384     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
2385 }
2386
2387 static void
2388 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
2389 {
2390     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
2391 }
2392
2393 static void
2394 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
2395 {
2396     bool got_lock;
2397
2398     json_destroy(idl->lock_request_id);
2399     idl->lock_request_id = NULL;
2400
2401     if (result->type == JSON_OBJECT) {
2402         const struct json *locked;
2403
2404         locked = shash_find_data(json_object(result), "locked");
2405         got_lock = locked && locked->type == JSON_TRUE;
2406     } else {
2407         got_lock = false;
2408     }
2409
2410     ovsdb_idl_update_has_lock(idl, got_lock);
2411     if (!got_lock) {
2412         idl->is_lock_contended = true;
2413     }
2414 }
2415
2416 static void
2417 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
2418                             const struct json *params,
2419                             bool new_has_lock)
2420 {
2421     if (idl->lock_name
2422         && params->type == JSON_ARRAY
2423         && json_array(params)->n > 0
2424         && json_array(params)->elems[0]->type == JSON_STRING) {
2425         const char *lock_name = json_string(json_array(params)->elems[0]);
2426
2427         if (!strcmp(idl->lock_name, lock_name)) {
2428             ovsdb_idl_update_has_lock(idl, new_has_lock);
2429             if (!new_has_lock) {
2430                 idl->is_lock_contended = true;
2431             }
2432         }
2433     }
2434 }