netdev: Decouple creating and configuring network devices.
[sliver-openvswitch.git] / lib / ovsdb-idl.c
index b7ee097..2ac6c2f 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010 Nicira Networks.
+/* Copyright (c) 2009, 2010, 2011 Nicira Networks.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -36,7 +36,7 @@
 #include "util.h"
 #include "vlog.h"
 
-VLOG_DEFINE_THIS_MODULE(ovsdb_idl)
+VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
 
 /* An arc from one idl_row to another.  When row A contains a UUID that
  * references row B, this is represented by an arc from A (the source) to B
@@ -66,11 +66,17 @@ struct ovsdb_idl {
     const struct ovsdb_idl_class *class;
     struct jsonrpc_session *session;
     struct shash table_by_name;
-    struct ovsdb_idl_table *tables;
+    struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
     struct json *monitor_request_id;
     unsigned int last_monitor_request_seqno;
     unsigned int change_seqno;
 
+    /* Database locking. */
+    char *lock_name;            /* Name of lock we need, NULL if none. */
+    bool has_lock;              /* Has db server told us we have the lock? */
+    bool is_lock_contended;     /* Has db server told us we can't get lock? */
+    struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
+
     /* Transaction support. */
     struct ovsdb_idl_txn *txn;
     struct hmap outstanding_txns;
@@ -136,17 +142,41 @@ static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
                                         const struct jsonrpc_msg *msg);
 
+static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
+static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
+static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
+                                       const struct json *);
+static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
+                                        const struct json *params,
+                                        bool new_has_lock);
+
 /* Creates and returns a connection to database 'remote', which should be in a
  * form acceptable to jsonrpc_session_open().  The connection will maintain an
  * in-memory replica of the remote database whose schema is described by
  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
- * by ovsdb-idlc.) */
+ * by ovsdb-idlc.)
+ *
+ * If 'monitor_everything_by_default' is true, then everything in the remote
+ * database will be replicated by default.  ovsdb_idl_omit() and
+ * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
+ * monitoring.
+ *
+ * If 'monitor_everything_by_default' is false, then no columns or tables will
+ * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
+ * must be used to choose some columns or tables to replicate.
+ */
 struct ovsdb_idl *
-ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
+ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
+                 bool monitor_everything_by_default)
 {
     struct ovsdb_idl *idl;
+    uint8_t default_mode;
     size_t i;
 
+    default_mode = (monitor_everything_by_default
+                    ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
+                    : 0);
+
     idl = xzalloc(sizeof *idl);
     idl->class = class;
     idl->session = jsonrpc_session_open(remote);
@@ -160,7 +190,8 @@ ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
         shash_add_assert(&idl->table_by_name, tc->name, table);
         table->class = tc;
         table->modes = xmalloc(tc->n_columns);
-        memset(table->modes, OVSDB_IDL_MODE_RW, tc->n_columns);
+        memset(table->modes, default_mode, tc->n_columns);
+        table->need_table = false;
         shash_init(&table->columns);
         for (j = 0; j < tc->n_columns; j++) {
             const struct ovsdb_idl_column *column = &tc->columns[j];
@@ -196,6 +227,8 @@ ovsdb_idl_destroy(struct ovsdb_idl *idl)
         shash_destroy(&idl->table_by_name);
         free(idl->tables);
         json_destroy(idl->monitor_request_id);
+        free(idl->lock_name);
+        json_destroy(idl->lock_request_id);
         free(idl);
     }
 }
@@ -239,7 +272,9 @@ ovsdb_idl_clear(struct ovsdb_idl *idl)
 /* Processes a batch of messages from the database server on 'idl'.  Returns
  * true if the database as seen through 'idl' changed, false if it did not
  * change.  The initial fetch of the entire contents of the remote database is
- * considered to be one kind of change.
+ * considered to be one kind of change.  If 'idl' has been configured to
+ * acquire a database lock (with ovsdb_idl_set_lock()), then successfully
+ * acquiring the lock is also considered to be a change.
  *
  * When this function returns false, the client may continue to use any data
  * structures it obtained from 'idl' in the past.  But when it returns true,
@@ -265,7 +300,7 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
     assert(!idl->txn);
     jsonrpc_session_run(idl->session);
     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
-        struct jsonrpc_msg *msg, *reply;
+        struct jsonrpc_msg *msg;
         unsigned int seqno;
 
         seqno = jsonrpc_session_get_seqno(idl->session);
@@ -273,6 +308,9 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
             idl->last_monitor_request_seqno = seqno;
             ovsdb_idl_txn_abort_all(idl);
             ovsdb_idl_send_monitor_request(idl);
+            if (idl->lock_name) {
+                ovsdb_idl_send_lock_request(idl);
+            }
             break;
         }
 
@@ -281,25 +319,38 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
             break;
         }
 
-        reply = NULL;
         if (msg->type == JSONRPC_NOTIFY
                    && !strcmp(msg->method, "update")
                    && msg->params->type == JSON_ARRAY
                    && msg->params->u.array.n == 2
                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
+            /* Database contents changed. */
             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
         } else if (msg->type == JSONRPC_REPLY
                    && idl->monitor_request_id
                    && json_equal(idl->monitor_request_id, msg->id)) {
+            /* Reply to our "monitor" request. */
             idl->change_seqno++;
             json_destroy(idl->monitor_request_id);
             idl->monitor_request_id = NULL;
             ovsdb_idl_clear(idl);
             ovsdb_idl_parse_update(idl, msg->result);
         } else if (msg->type == JSONRPC_REPLY
-                   && msg->id && msg->id->type == JSON_STRING
+                   && idl->lock_request_id
+                   && json_equal(idl->lock_request_id, msg->id)) {
+            /* Reply to our "lock" request. */
+            ovsdb_idl_parse_lock_reply(idl, msg->result);
+        } else if (msg->type == JSONRPC_NOTIFY
+                   && !strcmp(msg->method, "locked")) {
+            /* We got our lock. */
+            ovsdb_idl_parse_lock_notify(idl, msg->params, true);
+        } else if (msg->type == JSONRPC_NOTIFY
+                   && !strcmp(msg->method, "stolen")) {
+            /* Someone else stole our lock. */
+            ovsdb_idl_parse_lock_notify(idl, msg->params, false);
+        } else if (msg->type == JSONRPC_REPLY && msg->id->type == JSON_STRING
                    && !strcmp(msg->id->u.string, "echo")) {
-            /* It's a reply to our echo request.  Ignore it. */
+            /* Reply to our echo request.  Ignore it. */
         } else if ((msg->type == JSONRPC_ERROR
                     || msg->type == JSONRPC_REPLY)
                    && ovsdb_idl_txn_process_reply(idl, msg)) {
@@ -312,9 +363,6 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
                      jsonrpc_session_get_name(idl->session),
                      jsonrpc_msg_type_to_string(msg->type));
         }
-        if (reply) {
-            jsonrpc_session_send(idl->session, reply);
-        }
         jsonrpc_msg_destroy(msg);
     }
 
@@ -359,64 +407,113 @@ ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
 {
     jsonrpc_session_force_reconnect(idl->session);
 }
-
-static void
-ovsdb_idl_set_mode(struct ovsdb_idl *idl,
-                   const struct ovsdb_idl_column *column,
-                   enum ovsdb_idl_mode mode)
+\f
+static unsigned char *
+ovsdb_idl_get_mode(struct ovsdb_idl *idl,
+                   const struct ovsdb_idl_column *column)
 {
     size_t i;
 
+    assert(!idl->change_seqno);
+
     for (i = 0; i < idl->class->n_tables; i++) {
         const struct ovsdb_idl_table *table = &idl->tables[i];
         const struct ovsdb_idl_table_class *tc = table->class;
 
         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
-            unsigned char *modep = &table->modes[column - tc->columns];
-            assert(*modep == OVSDB_IDL_MODE_RW || *modep == mode);
-            *modep = mode;
-            return;
+            return &table->modes[column - tc->columns];
         }
     }
 
     NOT_REACHED();
 }
 
-/* By default, 'idl' replicates all of the columns in the remote database, and
- * ovsdb_idl_run() returns true upon a change to any column in the database.
- * Call this function to avoid alerting ovsdb_idl_run()'s caller upon changes
- * to 'column'.
+static void
+add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
+{
+    if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
+        struct ovsdb_idl_table *table;
+
+        table = shash_find_data(&idl->table_by_name,
+                                base->u.uuid.refTableName);
+        if (table) {
+            table->need_table = true;
+        } else {
+            VLOG_WARN("%s IDL class missing referenced table %s",
+                      idl->class->database, base->u.uuid.refTableName);
+        }
+    }
+}
+
+/* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
+ * ensures that any tables referenced by 'column' will be replicated, even if
+ * no columns in that table are selected for replication (see
+ * ovsdb_idl_add_table() for more information).
  *
- * This is useful for columns that a client treats as "write-only", that is, it
- * updates them but doesn't want to get alerted about its own updates.  It also
- * won't be alerted about other clients' updates, so this is suitable only for
- * use by a client that "owns" a particular column.
+ * This function is only useful if 'monitor_everything_by_default' was false in
+ * the call to ovsdb_idl_create().  This function should be called between
+ * ovsdb_idl_create() and the first call to ovsdb_idl_run().
+ */
+void
+ovsdb_idl_add_column(struct ovsdb_idl *idl,
+                     const struct ovsdb_idl_column *column)
+{
+    *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
+    add_ref_table(idl, &column->type.key);
+    add_ref_table(idl, &column->type.value);
+}
+
+/* Ensures that the table with class 'tc' will be replicated on 'idl' even if
+ * no columns are selected for replication.  This can be useful because it
+ * allows 'idl' to keep track of what rows in the table actually exist, which
+ * in turn allows columns that reference the table to have accurate contents.
+ * (The IDL presents the database with references to rows that do not exist
+ * removed.)
  *
- * The client must be careful not to retain pointers to data in 'column' across
- * calls to ovsdb_idl_run(), even when that function returns false, because
- * the client is not alerted to changes.
+ * This function is only useful if 'monitor_everything_by_default' was false in
+ * the call to ovsdb_idl_create().  This function should be called between
+ * ovsdb_idl_create() and the first call to ovsdb_idl_run().
+ */
+void
+ovsdb_idl_add_table(struct ovsdb_idl *idl,
+                    const struct ovsdb_idl_table_class *tc)
+{
+    size_t i;
+
+    for (i = 0; i < idl->class->n_tables; i++) {
+        struct ovsdb_idl_table *table = &idl->tables[i];
+
+        if (table->class == tc) {
+            table->need_table = true;
+            return;
+        }
+    }
+
+    NOT_REACHED();
+}
+
+/* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
  *
- * This function should be called after ovsdb_idl_create(), but before the
- * first call to ovsdb_idl_run().  For any given column, this function may be
- * called or ovsdb_idl_omit() may be called, but not both. */
+ * This function should be called between ovsdb_idl_create() and the first call
+ * to ovsdb_idl_run().
+ */
 void
-ovsdb_idl_set_write_only(struct ovsdb_idl *idl,
-                         const struct ovsdb_idl_column *column)
+ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
+                     const struct ovsdb_idl_column *column)
 {
-    ovsdb_idl_set_mode(idl, column, OVSDB_IDL_MODE_WO);
+    *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
 }
 
-/* By default, 'idl' replicates all of the columns in the remote database.
- * Call this function to omit replicating 'column'.  This saves CPU time and
- * bandwidth to the database.
+/* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
+ * OVSDB_IDL_MONITOR for details.
  *
- * This function should be called after ovsdb_idl_create(), but before the
- * first call to ovsdb_idl_run().  For any given column, this function may be
- * called or ovsdb_idl_set_write_only() may be called, but not both. */
+ * This function should be called between ovsdb_idl_create() and the first call
+ * to ovsdb_idl_run().
+ */
 void
 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
 {
-    ovsdb_idl_set_mode(idl, column, OVSDB_IDL_MODE_NONE);
+    *ovsdb_idl_get_mode(idl, column) = 0;
 }
 \f
 static void
@@ -433,16 +530,22 @@ ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
         struct json *monitor_request, *columns;
         size_t j;
 
-        monitor_request = json_object_create();
-        columns = json_array_create_empty();
+        columns = table->need_table ? json_array_create_empty() : NULL;
         for (j = 0; j < tc->n_columns; j++) {
             const struct ovsdb_idl_column *column = &tc->columns[j];
-            if (table->modes[j] != OVSDB_IDL_MODE_NONE) {
+            if (table->modes[j] & OVSDB_IDL_MONITOR) {
+                if (!columns) {
+                    columns = json_array_create_empty();
+                }
                 json_array_add(columns, json_string_create(column->name));
             }
         }
-        json_object_put(monitor_request, "columns", columns);
-        json_object_put(monitor_requests, tc->name, monitor_request);
+
+        if (columns) {
+            monitor_request = json_object_create();
+            json_object_put(monitor_request, "columns", columns);
+            json_object_put(monitor_requests, tc->name, monitor_request);
+        }
     }
 
     json_destroy(idl->monitor_request_id);
@@ -642,7 +745,7 @@ ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
 
             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
                 ovsdb_datum_swap(old, &datum);
-                if (table->modes[column_idx] == OVSDB_IDL_MODE_RW) {
+                if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
                     changed = true;
                 }
             } else {
@@ -1041,6 +1144,15 @@ ovsdb_idl_get(const struct ovsdb_idl_row *row,
 
     return ovsdb_idl_read(row, column);
 }
+
+/* Returns false if 'row' was obtained from the IDL, true if it was initialized
+ * to all-zero-bits by some other entity.  If 'row' was set up some other way
+ * then the return value is indeterminate. */
+bool
+ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
+{
+    return row->table == NULL;
+}
 \f
 /* Transactions. */
 
@@ -1051,6 +1163,8 @@ const char *
 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
 {
     switch (status) {
+    case TXN_UNCOMMITTED:
+        return "uncommitted";
     case TXN_UNCHANGED:
         return "unchanged";
     case TXN_INCOMPLETE:
@@ -1061,6 +1175,8 @@ ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
         return "success";
     case TXN_TRY_AGAIN:
         return "try again";
+    case TXN_NOT_LOCKED:
+        return "not locked";
     case TXN_ERROR:
         return "error";
     }
@@ -1077,7 +1193,7 @@ ovsdb_idl_txn_create(struct ovsdb_idl *idl)
     txn->request_id = NULL;
     txn->idl = idl;
     hmap_init(&txn->txn_rows);
-    txn->status = TXN_INCOMPLETE;
+    txn->status = TXN_UNCOMMITTED;
     txn->error = NULL;
     txn->dry_run = false;
     ds_init(&txn->comment);
@@ -1150,7 +1266,7 @@ ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
 void
 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
 {
-    if (txn->status != TXN_INCOMPLETE) {
+    if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
         poll_immediate_wake();
     }
 }
@@ -1288,9 +1404,24 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
         return txn->status;
     }
 
+    /* If we need a lock but don't have it, give up quickly. */
+    if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
+        txn->status = TXN_NOT_LOCKED;
+        ovsdb_idl_txn_disassemble(txn);
+        return txn->status;
+    }
+
     operations = json_array_create_1(
         json_string_create(txn->idl->class->database));
 
+    /* Assert that we have the required lock (avoiding a race). */
+    if (txn->idl->lock_name) {
+        struct json *op = json_object_create();
+        json_array_add(operations, op);
+        json_object_put_string(op, "op", "assert");
+        json_object_put_string(op, "lock", txn->idl->lock_name);
+    }
+
     /* Add prerequisites and declarations of new rows. */
     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
         /* XXX check that deleted rows exist even if no prereqs? */
@@ -1330,12 +1461,16 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
         if (row->old == row->new) {
             continue;
         } else if (!row->new) {
-            struct json *op = json_object_create();
-            json_object_put_string(op, "op", "delete");
-            json_object_put_string(op, "table", class->name);
-            json_object_put(op, "where", where_uuid_equals(&row->uuid));
-            json_array_add(operations, op);
-            any_updates = true;
+            if (class->is_root) {
+                struct json *op = json_object_create();
+                json_object_put_string(op, "op", "delete");
+                json_object_put_string(op, "table", class->name);
+                json_object_put(op, "where", where_uuid_equals(&row->uuid));
+                json_array_add(operations, op);
+                any_updates = true;
+            } else {
+                /* Let ovsdb-server decide whether to really delete it. */
+            }
         } else {
             struct json *row_json;
             struct json *op;
@@ -1349,6 +1484,8 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
             } else {
                 struct ovsdb_idl_txn_insert *insert;
 
+                any_updates = true;
+
                 json_object_put(op, "uuid-name",
                                 json_string_create_nocopy(
                                     uuid_name_from_uuid(&row->uuid)));
@@ -1369,22 +1506,29 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
                                                         &class->columns[idx];
 
                     if (row->old
-                        ? !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
-                                              &column->type)
-                        : !ovsdb_datum_is_default(&row->new[idx],
+                        || !ovsdb_datum_is_default(&row->new[idx],
                                                   &column->type)) {
                         json_object_put(row_json, column->name,
                                         substitute_uuids(
                                             ovsdb_datum_to_json(&row->new[idx],
                                                                 &column->type),
                                             txn));
+
+                        /* If anything really changed, consider it an update.
+                         * We can't suppress not-really-changed values earlier
+                         * or transactions would become nonatomic (see the big
+                         * comment inside ovsdb_idl_txn_write()). */
+                        if (!any_updates && row->old &&
+                            !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
+                                                &column->type)) {
+                            any_updates = true;
+                        }
                     }
                 }
             }
 
             if (!row->old || !shash_is_empty(json_object(row_json))) {
                 json_array_add(operations, op);
-                any_updates = true;
             } else {
                 json_destroy(op);
             }
@@ -1443,6 +1587,7 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
                        "transact", operations, &txn->request_id))) {
         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
                     json_hash(txn->request_id, 0));
+        txn->status = TXN_INCOMPLETE;
     } else {
         txn->status = TXN_TRY_AGAIN;
     }
@@ -1480,7 +1625,7 @@ void
 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
 {
     ovsdb_idl_txn_disassemble(txn);
-    if (txn->status == TXN_INCOMPLETE) {
+    if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
         txn->status = TXN_ABORTED;
     }
 }
@@ -1539,6 +1684,23 @@ ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
 }
 
+/* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
+ * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
+ * ovs-vswitchd).
+ *
+ * 'datum' must have the correct type for its column.  The IDL does not check
+ * that it meets schema constraints, but ovsdb-server will do so at commit time
+ * so it had better be correct.
+ *
+ * A transaction must be in progress.  Replication of 'column' must not have
+ * been disabled (by calling ovsdb_idl_omit()).
+ *
+ * Usually this function is used indirectly through one of the "set" functions
+ * generated by ovsdb-idlc.
+ *
+ * Takes ownership of what 'datum' points to (and in some cases destroys that
+ * data before returning) but makes a copy of 'datum' itself.  (Commonly
+ * 'datum' is on the caller's stack.) */
 void
 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row_,
                     const struct ovsdb_idl_column *column,
@@ -1550,7 +1712,26 @@ ovsdb_idl_txn_write(const struct ovsdb_idl_row *row_,
 
     assert(row->new != NULL);
     assert(column_idx < class->n_columns);
-    assert(row->table->modes[column_idx] != OVSDB_IDL_MODE_NONE);
+    assert(row->old == NULL ||
+           row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
+
+    /* If this is a write-only column and the datum being written is the same
+     * as the one already there, just skip the update entirely.  This is worth
+     * optimizing because we have a lot of columns that get periodically
+     * refreshed into the database but don't actually change that often.
+     *
+     * We don't do this for read/write columns because that would break
+     * atomicity of transactions--some other client might have written a
+     * different value in that column since we read it.  (But if a whole
+     * transaction only does writes of existing values, without making any real
+     * changes, we will drop the whole transaction later in
+     * ovsdb_idl_txn_commit().) */
+    if (row->table->modes[column_idx] == OVSDB_IDL_MONITOR
+        && ovsdb_datum_equals(ovsdb_idl_read(row, column),
+                              datum, &column->type)) {
+        ovsdb_datum_destroy(datum, &column->type);
+        return;
+    }
 
     if (hmap_node_is_null(&row->txn_node)) {
         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
@@ -1572,6 +1753,32 @@ ovsdb_idl_txn_write(const struct ovsdb_idl_row *row_,
     (column->parse)(row, &row->new[column_idx]);
 }
 
+/* Causes the original contents of 'column' in 'row_' to be verified as a
+ * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
+ * changed (or if 'row_' was deleted) between the time that the IDL originally
+ * read its contents and the time that the transaction commits, then the
+ * transaction aborts and ovsdb_idl_txn_commit() returns TXN_TRY_AGAIN.
+ *
+ * The intention is that, to ensure that no transaction commits based on dirty
+ * reads, an application should call ovsdb_idl_txn_verify() on each data item
+ * read as part of a read-modify-write operation.
+ *
+ * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
+ * value of 'column' is already known:
+ *
+ *   - If 'row_' is a row created by the current transaction (returned by
+ *     ovsdb_idl_txn_insert()).
+ *
+ *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
+ *     within the current transaction.
+ *
+ * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
+ * ovsdb_idl_txn_write() for a given read-modify-write.
+ *
+ * A transaction must be in progress.
+ *
+ * Usually this function is used indirectly through one of the "verify"
+ * functions generated by ovsdb-idlc. */
 void
 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
                      const struct ovsdb_idl_column *column)
@@ -1581,6 +1788,8 @@ ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
     size_t column_idx = column - class->columns;
 
     assert(row->new != NULL);
+    assert(row->old == NULL ||
+           row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
     if (!row->old
         || (row->written && bitmap_is_set(row->written, column_idx))) {
         return;
@@ -1596,6 +1805,13 @@ ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
     bitmap_set1(row->prereqs, column_idx);
 }
 
+/* Deletes 'row_' from its table.  May free 'row_', so it must not be
+ * accessed afterward.
+ *
+ * A transaction must be in progress.
+ *
+ * Usually this function is used indirectly through one of the "delete"
+ * functions generated by ovsdb-idlc. */
 void
 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
 {
@@ -1619,6 +1835,18 @@ ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
     row->new = NULL;
 }
 
+/* Inserts and returns a new row in the table with the specified 'class' in the
+ * database with open transaction 'txn'.
+ *
+ * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
+ * randomly generated; otherwise 'uuid' should specify a randomly generated
+ * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
+ * 'txn' is committed, but the IDL will replace any uses of the provisional
+ * UUID in the data to be to be committed by the UUID assigned by
+ * ovsdb-server.
+ *
+ * Usually this function is used indirectly through one of the "insert"
+ * functions generated by ovsdb-idlc. */
 const struct ovsdb_idl_row *
 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
                      const struct ovsdb_idl_table_class *class,
@@ -1792,6 +2020,7 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
         struct json_array *ops = &msg->result->u.array;
         int hard_errors = 0;
         int soft_errors = 0;
+        int lock_errors = 0;
         size_t i;
 
         for (i = 0; i < ops->n; i++) {
@@ -1809,6 +2038,8 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
                     if (error->type == JSON_STRING) {
                         if (!strcmp(error->u.string, "timed out")) {
                             soft_errors++;
+                        } else if (!strcmp(error->u.string, "not owner")) {
+                            lock_errors++;
                         } else if (strcmp(error->u.string, "aborted")) {
                             hard_errors++;
                             ovsdb_idl_txn_set_error_json(txn, op);
@@ -1828,7 +2059,7 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
             }
         }
 
-        if (!soft_errors && !hard_errors) {
+        if (!soft_errors && !hard_errors && !lock_errors) {
             struct ovsdb_idl_txn_insert *insert;
 
             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
@@ -1843,6 +2074,7 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
         }
 
         status = (hard_errors ? TXN_ERROR
+                  : lock_errors ? TXN_NOT_LOCKED
                   : soft_errors ? TXN_TRY_AGAIN
                   : TXN_SUCCESS);
     }
@@ -1864,4 +2096,139 @@ ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
 {
     return txn->idl;
 }
+\f
+/* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
+ * the database server and to avoid modifying the database when the lock cannot
+ * be acquired (that is, when another client has the same lock).
+ *
+ * If 'lock_name' is NULL, drops the locking requirement and releases the
+ * lock. */
+void
+ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
+{
+    assert(!idl->txn);
+    assert(hmap_is_empty(&idl->outstanding_txns));
+
+    if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
+        /* Release previous lock. */
+        ovsdb_idl_send_unlock_request(idl);
+        free(idl->lock_name);
+        idl->lock_name = NULL;
+        idl->is_lock_contended = false;
+    }
+
+    if (lock_name && !idl->lock_name) {
+        /* Acquire new lock. */
+        idl->lock_name = xstrdup(lock_name);
+        ovsdb_idl_send_lock_request(idl);
+    }
+}
+
+/* Returns true if 'idl' is configured to obtain a lock and owns that lock.
+ *
+ * Locking and unlocking happens asynchronously from the database client's
+ * point of view, so the information is only useful for optimization (e.g. if
+ * the client doesn't have the lock then there's no point in trying to write to
+ * the database). */
+bool
+ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
+{
+    return idl->has_lock;
+}
+
+/* Returns true if 'idl' is configured to obtain a lock but the database server
+ * has indicated that some other client already owns the requested lock. */
+bool
+ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
+{
+    return idl->is_lock_contended;
+}
+
+static void
+ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
+{
+    if (new_has_lock && !idl->has_lock) {
+        if (!idl->monitor_request_id) {
+            idl->change_seqno++;
+        } else {
+            /* We're waiting for a monitor reply, so don't signal that the
+             * database changed.  The monitor reply will increment change_seqno
+             * anyhow. */
+        }
+        idl->is_lock_contended = false;
+    }
+    idl->has_lock = new_has_lock;
+}
+
+static void
+ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
+                              struct json **idp)
+{
+    ovsdb_idl_update_has_lock(idl, false);
+
+    json_destroy(idl->lock_request_id);
+    idl->lock_request_id = NULL;
+
+    if (jsonrpc_session_is_connected(idl->session)) {
+        struct json *params;
+
+        params = json_array_create_1(json_string_create(idl->lock_name));
+        jsonrpc_session_send(idl->session,
+                             jsonrpc_create_request(method, params, idp));
+    }
+}
+
+static void
+ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
+{
+    ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
+}
+
+static void
+ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
+{
+    ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
+}
+
+static void
+ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
+{
+    bool got_lock;
+
+    json_destroy(idl->lock_request_id);
+    idl->lock_request_id = NULL;
+
+    if (result->type == JSON_OBJECT) {
+        const struct json *locked;
 
+        locked = shash_find_data(json_object(result), "locked");
+        got_lock = locked && locked->type == JSON_TRUE;
+    } else {
+        got_lock = false;
+    }
+
+    ovsdb_idl_update_has_lock(idl, got_lock);
+    if (!got_lock) {
+        idl->is_lock_contended = true;
+    }
+}
+
+static void
+ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
+                            const struct json *params,
+                            bool new_has_lock)
+{
+    if (idl->lock_name
+        && params->type == JSON_ARRAY
+        && json_array(params)->n > 0
+        && json_array(params)->elems[0]->type == JSON_STRING) {
+        const char *lock_name = json_string(json_array(params)->elems[0]);
+
+        if (!strcmp(idl->lock_name, lock_name)) {
+            ovsdb_idl_update_has_lock(idl, new_has_lock);
+            if (!new_has_lock) {
+                idl->is_lock_contended = true;
+            }
+        }
+    }
+}