-# Copyright (c) 2009, 2010, 2011 Nicira Networks
+# Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
'rows' map values. Refer to Row for more details.
- 'change_seqno': A number that represents the IDL's state. When the IDL
- is updated (by Idl.run()), its value changes.
+ is updated (by Idl.run()), its value changes. The sequence number can
+ occasionally change even if the database does not. This happens if the
+ connection to the database drops and reconnects, which causes the
+ database contents to be reloaded even if they didn't change. (It could
+ also happen if the database server sends out a "change" that reflects
+ what the IDL already thought was in the database. The database server is
+ not supposed to do that, but bugs could in theory cause it to do so.)
- 'lock_name': The name of the lock configured with Idl.set_lock(), or None
if no lock is configured.
purpose of the return value of Idl.run() and Idl.change_seqno. This is
useful for columns that the IDL's client will write but not read.
+ As a convenience to users, 'schema' may also be an instance of the
+ SchemaHelper class.
+
The IDL uses and modifies 'schema' directly."""
+ assert isinstance(schema, SchemaHelper)
+ schema = schema.get_idl_schema()
+
self.tables = schema.tables
self._db = schema
self._session = ovs.jsonrpc.Session.open(remote)
self.__dict__["_changes"] = None
del self._table.rows[self.uuid]
+ def increment(self, column_name):
+ """Causes the transaction, when committed, to increment the value of
+ 'column_name' within this row by 1. 'column_name' must have an integer
+ type. After the transaction commits successfully, the client may
+ retrieve the final (incremented) value of 'column_name' with
+ Transaction.get_increment_new_value().
+
+ The client could accomplish something similar by reading and writing
+ and verify()ing columns. However, increment() will never (by itself)
+ cause a transaction to fail because of a verify error.
+
+ The intended use is for incrementing the "next_cfg" column in
+ the Open_vSwitch table."""
+ self._idl.txn._increment(self, column_name)
+
def _uuid_name_from_uuid(uuid):
return "row%s" % str(uuid).replace("-", "_")
class Transaction(object):
+ """A transaction may modify the contents of a database by modifying the
+ values of columns, deleting rows, inserting rows, or adding checks that
+ columns in the database have not changed ("verify" operations), through
+ Row methods.
+
+ Reading and writing columns and inserting and deleting rows are all
+ straightforward. The reasons to verify columns are less obvious.
+ Verification is the key to maintaining transactional integrity. Because
+ OVSDB handles multiple clients, it can happen that between the time that
+ OVSDB client A reads a column and writes a new value, OVSDB client B has
+ written that column. Client A's write should not ordinarily overwrite
+ client B's, especially if the column in question is a "map" column that
+ contains several more or less independent data items. If client A adds a
+ "verify" operation before it writes the column, then the transaction fails
+ in case client B modifies it first. Client A will then see the new value
+ of the column and compose a new transaction based on the new contents
+ written by client B.
+
+ When a transaction is complete, which must be before the next call to
+ Idl.run(), call Transaction.commit() or Transaction.abort().
+
+ The life-cycle of a transaction looks like this:
+
+ 1. Create the transaction and record the initial sequence number:
+
+ seqno = idl.change_seqno(idl)
+ txn = Transaction(idl)
+
+ 2. Modify the database with Row and Transaction methods.
+
+ 3. Commit the transaction by calling Transaction.commit(). The first call
+ to this function probably returns Transaction.INCOMPLETE. The client
+ must keep calling again along as this remains true, calling Idl.run() in
+ between to let the IDL do protocol processing. (If the client doesn't
+ have anything else to do in the meantime, it can use
+ Transaction.commit_block() to avoid having to loop itself.)
+
+ 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
+ to change from the saved 'seqno' (it's possible that it's already
+ changed, in which case the client should not wait at all), then start
+ over from step 1. Only a call to Idl.run() will change the return value
+ of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
+
# Status values that Transaction.commit() can return.
UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
UNCHANGED = "unchanged" # Transaction didn't include any changes.
SUCCESS = "success" # Commit successful.
TRY_AGAIN = "try again" # Commit failed because a "verify" operation
# reported an inconsistency, due to a network
- # problem, or other transient failure.
+ # problem, or other transient failure. Wait
+ # for a change, then try again.
NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
ERROR = "error" # Commit failed due to a hard error.
self._status = Transaction.UNCOMMITTED
self._error = None
self._comments = []
+ self._commit_seqno = self.idl.change_seqno
- self._inc_table = None
+ self._inc_row = None
self._inc_column = None
- self._inc_where = None
self._inserted_rows = {} # Map from UUID to _InsertedRow
relatively human-readable form.)"""
self._comments.append(comment)
- def increment(self, table, column, where):
- assert not self._inc_table
- self._inc_table = table
- self._inc_column = column
- self._inc_where = where
-
def wait(self, poller):
+ """Causes poll_block() to wake up if this transaction has completed
+ committing."""
if self._status not in (Transaction.UNCOMMITTED,
Transaction.INCOMPLETE):
poller.immediate_wake()
self._txn_rows = {}
def commit(self):
- """Attempts to commit this transaction and returns the status of the
- commit operation, one of the constants declared as class attributes.
- If the return value is Transaction.INCOMPLETE, then the transaction is
- not yet complete and the caller should try calling again later, after
- calling Idl.run() to run the Idl.
+ """Attempts to commit 'txn'. Returns the status of the commit
+ operation, one of the following constants:
+
+ Transaction.INCOMPLETE:
+
+ The transaction is in progress, but not yet complete. The caller
+ should call again later, after calling Idl.run() to let the
+ IDL do OVSDB protocol processing.
+
+ Transaction.UNCHANGED:
+
+ The transaction is complete. (It didn't actually change the
+ database, so the IDL didn't send any request to the database
+ server.)
+
+ Transaction.ABORTED:
+
+ The caller previously called Transaction.abort().
+
+ Transaction.SUCCESS:
+
+ The transaction was successful. The update made by the
+ transaction (and possibly other changes made by other database
+ clients) should already be visible in the IDL.
+
+ Transaction.TRY_AGAIN:
+
+ The transaction failed for some transient reason, e.g. because a
+ "verify" operation reported an inconsistency or due to a network
+ problem. The caller should wait for a change to the database,
+ then compose a new transaction, and commit the new transaction.
+
+ Use Idl.change_seqno to wait for a change in the database. It is
+ important to use its value *before* the initial call to
+ Transaction.commit() as the baseline for this purpose, because
+ the change that one should wait for can happen after the initial
+ call but before the call that returns Transaction.TRY_AGAIN, and
+ using some other baseline value in that situation could cause an
+ indefinite wait if the database rarely changes.
+
+ Transaction.NOT_LOCKED:
+
+ The transaction failed because the IDL has been configured to
+ require a database lock (with Idl.set_lock()) but didn't
+ get it yet or has already lost it.
Committing a transaction rolls back all of the changes that it made to
- the Idl's copy of the database. If the transaction commits
+ the IDL's copy of the database. If the transaction commits
successfully, then the database server will send an update and, thus,
- the Idl will be updated with the committed changes."""
+ the IDL will be updated with the committed changes."""
# The status can only change if we're the active transaction.
# (Otherwise, our status will change only in Idl.run().)
if self != self.idl.txn:
operations.append(op)
# Add increment.
- if self._inc_table and any_updates:
+ if self._inc_row and any_updates:
self._inc_index = len(operations) - 1
operations.append({"op": "mutate",
- "table": self._inc_table,
+ "table": self._inc_row._table.name,
"where": self._substitute_uuids(
- self._inc_where),
+ _where_uuid_equals(self._inc_row.uuid)),
"mutations": [[self._inc_column, "+=", 1]]})
operations.append({"op": "select",
- "table": self._inc_table,
+ "table": self._inc_row._table.name,
"where": self._substitute_uuids(
- self._inc_where),
+ _where_uuid_equals(self._inc_row.uuid)),
"columns": [self._inc_column]})
# Add comment.
return self._status
def commit_block(self):
+ """Attempts to commit this transaction, blocking until the commit
+ either succeeds or fails. Returns the final commit status, which may
+ be any Transaction.* value other than Transaction.INCOMPLETE.
+
+ This function calls Idl.run() on this transaction'ss IDL, so it may
+ cause Idl.change_seqno to change."""
while True:
status = self.commit()
if status != Transaction.INCOMPLETE:
poller.block()
def get_increment_new_value(self):
+ """Returns the final (incremented) value of the column in this
+ transaction that was set to be incremented by Row.increment. This
+ transaction must have committed successfully."""
assert self._status == Transaction.SUCCESS
return self._inc_new_value
return inserted_row.real
return None
+ def _increment(self, row, column):
+ assert not self._inc_row
+ self._inc_row = row
+ self._inc_column = column
+
def _write(self, row, column, datum):
assert row._changes is not None
vlog.warn("operation reply is not JSON null or object")
if not soft_errors and not hard_errors and not lock_errors:
- if self._inc_table and not self.__process_inc_reply(ops):
+ if self._inc_row and not self.__process_inc_reply(ops):
hard_errors = True
for insert in self._inserted_rows.itervalues():
insert.real = uuid_
return True
+
+
+class SchemaHelper(object):
+ """IDL Schema helper.
+
+ This class encapsulates the logic required to generate schemas suitable
+ for creating 'ovs.db.idl.Idl' objects. Clients should register columns
+ they are interested in using register_columns(). When finished, the
+ get_idl_schema() function may be called.
+
+ The location on disk of the schema used may be found in the
+ 'schema_location' variable."""
+
+ def __init__(self, location=None):
+ """Creates a new Schema object."""
+
+ if location is None:
+ location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
+
+ self.schema_location = location
+ self._tables = {}
+ self._all = False
+
+ def register_columns(self, table, columns):
+ """Registers interest in the given 'columns' of 'table'. Future calls
+ to get_idl_schema() will include 'table':column for each column in
+ 'columns'. This function automatically avoids adding duplicate entries
+ to the schema.
+
+ 'table' must be a string.
+ 'columns' must be a list of strings.
+ """
+
+ assert type(table) is str
+ assert type(columns) is list
+
+ columns = set(columns) | self._tables.get(table, set())
+ self._tables[table] = columns
+
+ def register_all(self):
+ """Registers interest in every column of every table."""
+ self._all = True
+
+ def get_idl_schema(self):
+ """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
+ object based on columns registered using the register_columns()
+ function."""
+
+ schema = ovs.db.schema.DbSchema.from_json(
+ ovs.json.from_file(self.schema_location))
+
+ if not self._all:
+ schema_tables = {}
+ for table, columns in self._tables.iteritems():
+ schema_tables[table] = (
+ self._keep_table_columns(schema, table, columns))
+
+ schema.tables = schema_tables
+ return schema
+
+ def _keep_table_columns(self, schema, table_name, columns):
+ assert table_name in schema.tables
+ table = schema.tables[table_name]
+
+ new_columns = {}
+ for column_name in columns:
+ assert type(column_name) is str
+ assert column_name in table.columns
+
+ new_columns[column_name] = table.columns[column_name]
+
+ table.columns = new_columns
+ return table