unsigned int last_monitor_request_seqno;
unsigned int change_seqno;
+ /* Database locking. */
+ char *lock_name; /* Name of lock we need, NULL if none. */
+ bool has_lock; /* Has db server told us we have the lock? */
+ bool is_lock_contended; /* Has db server told us we can't get lock? */
+ struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
+
/* Transaction support. */
struct ovsdb_idl_txn *txn;
struct hmap outstanding_txns;
static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
const struct jsonrpc_msg *msg);
+static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
+static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
+static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
+ const struct json *);
+static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
+ const struct json *params,
+ bool new_has_lock);
+
/* Creates and returns a connection to database 'remote', which should be in a
* form acceptable to jsonrpc_session_open(). The connection will maintain an
* in-memory replica of the remote database whose schema is described by
shash_destroy(&idl->table_by_name);
free(idl->tables);
json_destroy(idl->monitor_request_id);
+ free(idl->lock_name);
+ json_destroy(idl->lock_request_id);
free(idl);
}
}
/* Processes a batch of messages from the database server on 'idl'. Returns
* true if the database as seen through 'idl' changed, false if it did not
* change. The initial fetch of the entire contents of the remote database is
- * considered to be one kind of change.
+ * considered to be one kind of change. If 'idl' has been configured to
+ * acquire a database lock (with ovsdb_idl_set_lock()), then successfully
+ * acquiring the lock is also considered to be a change.
*
* When this function returns false, the client may continue to use any data
* structures it obtained from 'idl' in the past. But when it returns true,
idl->last_monitor_request_seqno = seqno;
ovsdb_idl_txn_abort_all(idl);
ovsdb_idl_send_monitor_request(idl);
+ if (idl->lock_name) {
+ ovsdb_idl_send_lock_request(idl);
+ }
break;
}
&& msg->params->type == JSON_ARRAY
&& msg->params->u.array.n == 2
&& msg->params->u.array.elems[0]->type == JSON_NULL) {
+ /* Database contents changed. */
ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
} else if (msg->type == JSONRPC_REPLY
&& idl->monitor_request_id
&& json_equal(idl->monitor_request_id, msg->id)) {
+ /* Reply to our "monitor" request. */
idl->change_seqno++;
json_destroy(idl->monitor_request_id);
idl->monitor_request_id = NULL;
ovsdb_idl_clear(idl);
ovsdb_idl_parse_update(idl, msg->result);
+ } else if (msg->type == JSONRPC_REPLY
+ && idl->lock_request_id
+ && json_equal(idl->lock_request_id, msg->id)) {
+ /* Reply to our "lock" request. */
+ ovsdb_idl_parse_lock_reply(idl, msg->result);
+ } else if (msg->type == JSONRPC_NOTIFY
+ && !strcmp(msg->method, "locked")) {
+ /* We got our lock. */
+ ovsdb_idl_parse_lock_notify(idl, msg->params, true);
+ } else if (msg->type == JSONRPC_NOTIFY
+ && !strcmp(msg->method, "stolen")) {
+ /* Someone else stole our lock. */
+ ovsdb_idl_parse_lock_notify(idl, msg->params, false);
} else if (msg->type == JSONRPC_REPLY && msg->id->type == JSON_STRING
&& !strcmp(msg->id->u.string, "echo")) {
- /* It's a reply to our echo request. Ignore it. */
+ /* Reply to our echo request. Ignore it. */
} else if ((msg->type == JSONRPC_ERROR
|| msg->type == JSONRPC_REPLY)
&& ovsdb_idl_txn_process_reply(idl, msg)) {
return ovsdb_idl_read(row, column);
}
+
+/* Returns false if 'row' was obtained from the IDL, true if it was initialized
+ * to all-zero-bits by some other entity. If 'row' was set up some other way
+ * then the return value is indeterminate. */
+bool
+ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
+{
+ return row->table == NULL;
+}
\f
/* Transactions. */
ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
{
switch (status) {
+ case TXN_UNCOMMITTED:
+ return "uncommitted";
case TXN_UNCHANGED:
return "unchanged";
case TXN_INCOMPLETE:
return "success";
case TXN_TRY_AGAIN:
return "try again";
+ case TXN_NOT_LOCKED:
+ return "not locked";
case TXN_ERROR:
return "error";
}
txn->request_id = NULL;
txn->idl = idl;
hmap_init(&txn->txn_rows);
- txn->status = TXN_INCOMPLETE;
+ txn->status = TXN_UNCOMMITTED;
txn->error = NULL;
txn->dry_run = false;
ds_init(&txn->comment);
void
ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
{
- if (txn->status != TXN_INCOMPLETE) {
+ if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
poll_immediate_wake();
}
}
return txn->status;
}
+ /* If we need a lock but don't have it, give up quickly. */
+ if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
+ txn->status = TXN_NOT_LOCKED;
+ ovsdb_idl_txn_disassemble(txn);
+ return txn->status;
+ }
+
operations = json_array_create_1(
json_string_create(txn->idl->class->database));
+ /* Assert that we have the required lock (avoiding a race). */
+ if (txn->idl->lock_name) {
+ struct json *op = json_object_create();
+ json_array_add(operations, op);
+ json_object_put_string(op, "op", "assert");
+ json_object_put_string(op, "lock", txn->idl->lock_name);
+ }
+
/* Add prerequisites and declarations of new rows. */
HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
/* XXX check that deleted rows exist even if no prereqs? */
if (row->old == row->new) {
continue;
} else if (!row->new) {
- struct json *op = json_object_create();
- json_object_put_string(op, "op", "delete");
- json_object_put_string(op, "table", class->name);
- json_object_put(op, "where", where_uuid_equals(&row->uuid));
- json_array_add(operations, op);
- any_updates = true;
+ if (class->is_root) {
+ struct json *op = json_object_create();
+ json_object_put_string(op, "op", "delete");
+ json_object_put_string(op, "table", class->name);
+ json_object_put(op, "where", where_uuid_equals(&row->uuid));
+ json_array_add(operations, op);
+ any_updates = true;
+ } else {
+ /* Let ovsdb-server decide whether to really delete it. */
+ }
} else {
struct json *row_json;
struct json *op;
} else {
struct ovsdb_idl_txn_insert *insert;
+ any_updates = true;
+
json_object_put(op, "uuid-name",
json_string_create_nocopy(
uuid_name_from_uuid(&row->uuid)));
ovsdb_datum_to_json(&row->new[idx],
&column->type),
txn));
+
+ /* If anything really changed, consider it an update.
+ * We can't suppress not-really-changed values earlier
+ * or transactions would become nonatomic (see the big
+ * comment inside ovsdb_idl_txn_write()). */
+ if (!any_updates && row->old &&
+ !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
+ &column->type)) {
+ any_updates = true;
+ }
}
}
}
if (!row->old || !shash_is_empty(json_object(row_json))) {
json_array_add(operations, op);
- any_updates = true;
} else {
json_destroy(op);
}
"transact", operations, &txn->request_id))) {
hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
json_hash(txn->request_id, 0));
+ txn->status = TXN_INCOMPLETE;
} else {
txn->status = TXN_TRY_AGAIN;
}
ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
{
ovsdb_idl_txn_disassemble(txn);
- if (txn->status == TXN_INCOMPLETE) {
+ if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
txn->status = TXN_ABORTED;
}
}
* been disabled (by calling ovsdb_idl_omit()).
*
* Usually this function is used indirectly through one of the "set" functions
- * generated by ovsdb-idlc. */
+ * generated by ovsdb-idlc.
+ *
+ * Takes ownership of what 'datum' points to (and in some cases destroys that
+ * data before returning) but makes a copy of 'datum' itself. (Commonly
+ * 'datum' is on the caller's stack.) */
void
ovsdb_idl_txn_write(const struct ovsdb_idl_row *row_,
const struct ovsdb_idl_column *column,
*
* We don't do this for read/write columns because that would break
* atomicity of transactions--some other client might have written a
- * different value in that column since we read it. */
+ * different value in that column since we read it. (But if a whole
+ * transaction only does writes of existing values, without making any real
+ * changes, we will drop the whole transaction later in
+ * ovsdb_idl_txn_commit().) */
if (row->table->modes[column_idx] == OVSDB_IDL_MONITOR
&& ovsdb_datum_equals(ovsdb_idl_read(row, column),
datum, &column->type)) {
struct json_array *ops = &msg->result->u.array;
int hard_errors = 0;
int soft_errors = 0;
+ int lock_errors = 0;
size_t i;
for (i = 0; i < ops->n; i++) {
if (error->type == JSON_STRING) {
if (!strcmp(error->u.string, "timed out")) {
soft_errors++;
+ } else if (!strcmp(error->u.string, "not owner")) {
+ lock_errors++;
} else if (strcmp(error->u.string, "aborted")) {
hard_errors++;
ovsdb_idl_txn_set_error_json(txn, op);
}
}
- if (!soft_errors && !hard_errors) {
+ if (!soft_errors && !hard_errors && !lock_errors) {
struct ovsdb_idl_txn_insert *insert;
if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
}
status = (hard_errors ? TXN_ERROR
+ : lock_errors ? TXN_NOT_LOCKED
: soft_errors ? TXN_TRY_AGAIN
: TXN_SUCCESS);
}
{
return txn->idl;
}
+\f
+/* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
+ * the database server and to avoid modifying the database when the lock cannot
+ * be acquired (that is, when another client has the same lock).
+ *
+ * If 'lock_name' is NULL, drops the locking requirement and releases the
+ * lock. */
+void
+ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
+{
+ assert(!idl->txn);
+ assert(hmap_is_empty(&idl->outstanding_txns));
+
+ if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
+ /* Release previous lock. */
+ ovsdb_idl_send_unlock_request(idl);
+ free(idl->lock_name);
+ idl->lock_name = NULL;
+ idl->is_lock_contended = false;
+ }
+
+ if (lock_name && !idl->lock_name) {
+ /* Acquire new lock. */
+ idl->lock_name = xstrdup(lock_name);
+ ovsdb_idl_send_lock_request(idl);
+ }
+}
+
+/* Returns true if 'idl' is configured to obtain a lock and owns that lock.
+ *
+ * Locking and unlocking happens asynchronously from the database client's
+ * point of view, so the information is only useful for optimization (e.g. if
+ * the client doesn't have the lock then there's no point in trying to write to
+ * the database). */
+bool
+ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
+{
+ return idl->has_lock;
+}
+/* Returns true if 'idl' is configured to obtain a lock but the database server
+ * has indicated that some other client already owns the requested lock. */
+bool
+ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
+{
+ return idl->is_lock_contended;
+}
+
+static void
+ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
+{
+ if (new_has_lock && !idl->has_lock) {
+ if (!idl->monitor_request_id) {
+ idl->change_seqno++;
+ } else {
+ /* We're waiting for a monitor reply, so don't signal that the
+ * database changed. The monitor reply will increment change_seqno
+ * anyhow. */
+ }
+ idl->is_lock_contended = false;
+ }
+ idl->has_lock = new_has_lock;
+}
+
+static void
+ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
+ struct json **idp)
+{
+ ovsdb_idl_update_has_lock(idl, false);
+
+ json_destroy(idl->lock_request_id);
+ idl->lock_request_id = NULL;
+
+ if (jsonrpc_session_is_connected(idl->session)) {
+ struct json *params;
+
+ params = json_array_create_1(json_string_create(idl->lock_name));
+ jsonrpc_session_send(idl->session,
+ jsonrpc_create_request(method, params, idp));
+ }
+}
+
+static void
+ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
+{
+ ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
+}
+
+static void
+ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
+{
+ ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
+}
+
+static void
+ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
+{
+ bool got_lock;
+
+ json_destroy(idl->lock_request_id);
+ idl->lock_request_id = NULL;
+
+ if (result->type == JSON_OBJECT) {
+ const struct json *locked;
+
+ locked = shash_find_data(json_object(result), "locked");
+ got_lock = locked && locked->type == JSON_TRUE;
+ } else {
+ got_lock = false;
+ }
+
+ ovsdb_idl_update_has_lock(idl, got_lock);
+ if (!got_lock) {
+ idl->is_lock_contended = true;
+ }
+}
+
+static void
+ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
+ const struct json *params,
+ bool new_has_lock)
+{
+ if (idl->lock_name
+ && params->type == JSON_ARRAY
+ && json_array(params)->n > 0
+ && json_array(params)->elems[0]->type == JSON_STRING) {
+ const char *lock_name = json_string(json_array(params)->elems[0]);
+
+ if (!strcmp(idl->lock_name, lock_name)) {
+ ovsdb_idl_update_has_lock(idl, new_has_lock);
+ if (!new_has_lock) {
+ idl->is_lock_contended = true;
+ }
+ }
+ }
+}