python: Implement write support in Python IDL for OVSDB.
authorBen Pfaff <blp@nicira.com>
Wed, 21 Sep 2011 17:43:03 +0000 (10:43 -0700)
committerBen Pfaff <blp@nicira.com>
Fri, 23 Sep 2011 21:23:16 +0000 (14:23 -0700)
Until now, the Python bindings for OVSDB have not supported writing to the
database.  Instead, writes had to be done with "ovs-vsctl" subprocesses.
This commit adds write support and brings the Python bindings in line with
the C bindings.

This commit deletes the Python-specific IDL tests in favor of using the
same tests as the C version of the IDL, which now pass with both
implementations.

This commit updates the two users of the Python IDL to use the new write
support.  I tested this updates only by writing unit tests for them,
which appear in upcoming commits.

14 files changed:
debian/ovs-monitor-ipsec
ovsdb/automake.mk
ovsdb/ovsdb-idlc.in
python/ovs/db/data.py
python/ovs/db/idl.py
python/ovs/db/schema.py
python/ovs/db/types.py
tests/atlocal.in
tests/automake.mk
tests/ovsdb-idl-py.at [deleted file]
tests/ovsdb-idl.at
tests/ovsdb.at
tests/test-ovsdb.py
xenserver/usr_share_openvswitch_scripts_ovs-xapi-sync

index b9a4126..c123188 100755 (executable)
@@ -33,6 +33,7 @@ import socket
 import subprocess
 import sys
 
+import ovs.dirs
 from ovs.db import error
 from ovs.db import types
 import ovs.util
@@ -376,7 +377,7 @@ def keep_table_columns(schema, table_name, column_types):
     table.columns = new_columns
     return table
  
-def monitor_uuid_schema_cb(schema):
+def prune_schema(schema):
     string_type = types.Type(types.BaseType(types.StringType))
     optional_ssl_type = types.Type(types.BaseType(types.UuidType,
                                                   ref_table_name='SSL'), None, 0, 1)
@@ -425,18 +426,17 @@ def update_ipsec(ipsec, interfaces, new_interfaces):
             s_log.warning("skipping ipsec config for %s: %s" % (name, msg))
 
 def get_ssl_cert(data):
-    for ovs_rec in data["Open_vSwitch"].itervalues():
-        if ovs_rec.ssl.as_list():
-            ssl_rec = data["SSL"][ovs_rec.ssl.as_scalar()]
-            return (ssl_rec.certificate.as_scalar(),
-                    ssl_rec.private_key.as_scalar())
+    for ovs_rec in data["Open_vSwitch"].rows.itervalues():
+        ssl = ovs_rec.ssl
+        if ssl and ssl.certificate and ssl.private_key:
+            return (ssl.certificate, ssl.private_key)
 
     return None
 
 def main(argv):
     try:
         options, args = getopt.gnu_getopt(
-            argv[1:], 'h', ['help'] + ovs.daemon.LONG_OPTIONS)
+            argv[1:], 'h', ['help', 'root-prefix='] + ovs.daemon.LONG_OPTIONS)
     except getopt.GetoptError, geo:
         sys.stderr.write("%s: %s\n" % (ovs.util.PROGRAM_NAME, geo.msg))
         sys.exit(1)
@@ -444,6 +444,9 @@ def main(argv):
     for key, value in options:
         if key in ['-h', '--help']:
             usage()
+        elif key == '--root-prefix':
+            global root_prefix
+            root_prefix = value
         elif not ovs.daemon.parse_opt(key, value):
             sys.stderr.write("%s: unhandled option %s\n"
                              % (ovs.util.PROGRAM_NAME, key))
@@ -455,7 +458,11 @@ def main(argv):
         sys.exit(1)
 
     remote = args[0]
-    idl = ovs.db.idl.Idl(remote, "Open_vSwitch", monitor_uuid_schema_cb)
+
+    schema_file = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
+    schema = ovs.db.schema.DbSchema.from_json(ovs.json.from_file(schema_file))
+    prune_schema(schema)
+    idl = ovs.db.idl.Idl(remote, schema)
 
     ovs.daemon.daemonize()
 
@@ -469,20 +476,21 @@ def main(argv):
             poller.block()
             continue
 
-        ssl_cert = get_ssl_cert(idl.data)
+        ssl_cert = get_ssl_cert(idl.tables)
  
         new_interfaces = {}
-        for rec in idl.data["Interface"].itervalues():
-            if rec.type.as_scalar() == "ipsec_gre":
-                name = rec.name.as_scalar()
+        for rec in idl.tables["Interface"].rows.itervalues():
+            if rec.type == "ipsec_gre":
+                name = rec.name
+                options = rec.options
                 entry = {
-                    "remote_ip": rec.options.get("remote_ip"),
-                    "local_ip": rec.options.get("local_ip", "0.0.0.0/0"),
-                    "certificate": rec.options.get("certificate"),
-                    "private_key": rec.options.get("private_key"),
-                    "use_ssl_cert": rec.options.get("use_ssl_cert"),
-                    "peer_cert": rec.options.get("peer_cert"),
-                    "psk": rec.options.get("psk") }
+                    "remote_ip": options.get("remote_ip"),
+                    "local_ip": options.get("local_ip", "0.0.0.0/0"),
+                    "certificate": options.get("certificate"),
+                    "private_key": options.get("private_key"),
+                    "use_ssl_cert": options.get("use_ssl_cert"),
+                    "peer_cert": options.get("peer_cert"),
+                    "psk": options.get("psk") }
 
                 if entry["peer_cert"] and entry["psk"]:
                     s_log.warning("both 'peer_cert' and 'psk' defined for %s" 
index 39bc65f..5d0b6d7 100644 (file)
@@ -67,7 +67,7 @@ EXTRA_DIST += \
        ovsdb/ovsdb-idlc.in \
        ovsdb/ovsdb-idlc.1
 DISTCLEANFILES += ovsdb/ovsdb-idlc
-SUFFIXES += .ovsidl
+SUFFIXES += .ovsidl .ovsschema .py
 OVSDB_IDLC = $(run_python) $(srcdir)/ovsdb/ovsdb-idlc.in
 .ovsidl.c:
        $(OVSDB_IDLC) c-idl-source $< > $@.tmp
@@ -75,6 +75,9 @@ OVSDB_IDLC = $(run_python) $(srcdir)/ovsdb/ovsdb-idlc.in
 .ovsidl.h:
        $(OVSDB_IDLC) c-idl-header $< > $@.tmp
        mv $@.tmp $@
+.ovsschema.py:
+       $(OVSDB_IDLC) python-module $< > $@.tmp
+       mv $@.tmp $@
 
 EXTRA_DIST += $(OVSIDL_BUILT)
 BUILT_SOURCES += $(OVSIDL_BUILT)
index 3392c35..4e40288 100755 (executable)
@@ -548,6 +548,21 @@ void
         print "    %s_columns_init();" % structName
     print "}"
 
+def print_python_module(schema_file):
+    schema = ovs.db.schema.DbSchema.from_json(ovs.json.from_file(schema_file))
+    print """\
+# Generated automatically -- do not modify!    -*- buffer-read-only: t -*-
+
+import ovs.db.schema
+import ovs.json
+
+__schema_json = \"\"\"
+%s
+\"\"\"
+
+schema = ovs.db.schema.DbSchema.from_json(ovs.json.from_string(__schema_json))
+""" % ovs.json.to_string(schema.to_json(), pretty=True)
+
 def ovsdb_escape(string):
     def escape(match):
         c = match.group(0)
@@ -569,8 +584,6 @@ def ovsdb_escape(string):
             return '\\x%02x' % ord(c)
     return re.sub(r'["\\\000-\037]', escape, string)
 
-
-
 def usage():
     print """\
 %(argv0)s: ovsdb schema compiler
@@ -580,6 +593,7 @@ The following commands are supported:
   annotate SCHEMA ANNOTATIONS print SCHEMA combined with ANNOTATIONS
   c-idl-header IDL            print C header file for IDL
   c-idl-source IDL            print C source file for IDL implementation
+  python-module IDL           print Python module for IDL
   nroff IDL                   print schema documentation in nroff format
 
 The following options are also available:
@@ -618,7 +632,8 @@ if __name__ == "__main__":
 
         commands = {"annotate": (annotateSchema, 2),
                     "c-idl-header": (printCIDLHeader, 1),
-                    "c-idl-source": (printCIDLSource, 1)}
+                    "c-idl-source": (printCIDLSource, 1),
+                    "python-module": (print_python_module, 1)}
 
         if not args[0] in commands:
             sys.stderr.write("%s: unknown command \"%s\" "
index f71def9..172d552 100644 (file)
@@ -81,6 +81,18 @@ class Atom(object):
 
     @staticmethod
     def default(type_):
+        """Returns the default value for the given type_, which must be an
+        instance of ovs.db.types.AtomicType.
+
+        The default value for each atomic type is;
+
+          - 0, for integer or real atoms.
+
+          - False, for a boolean atom.
+
+          - "", for a string atom.
+
+          - The all-zeros UUID, for a UUID atom."""
         return Atom(type_)
 
     def is_default(self):
@@ -102,12 +114,21 @@ class Atom(object):
         atom.check_constraints(base)
         return atom
 
+    @staticmethod
+    def from_python(base, value):
+        value = ovs.db.parser.float_to_int(value)
+        if type(value) in base.type.python_types:
+            atom = Atom(base.type, value)
+        else:
+            raise error.Error("expected %s, got %s" % (base.type, type(value)))
+        atom.check_constraints(base)
+        return atom
+
     def check_constraints(self, base):
         """Checks whether 'atom' meets the constraints (if any) defined in
         'base' and raises an ovs.db.error.Error if any constraint is violated.
 
         'base' and 'atom' must have the same type.
-
         Checking UUID constraints is deferred to transaction commit time, so
         this function does nothing for UUID constraints."""
         assert base.type == self.type
@@ -363,6 +384,9 @@ class Datum(object):
         else:
             return [k.value for k in self.values.iterkeys()]
         
+    def as_dict(self):
+        return dict(self.values)
+
     def as_scalar(self):
         if len(self.values) == 1:
             if self.type.is_map():
@@ -373,6 +397,97 @@ class Datum(object):
         else:
             return None
 
+    def to_python(self, uuid_to_row):
+        """Returns this datum's value converted into a natural Python
+        representation of this datum's type, according to the following
+        rules:
+
+        - If the type has exactly one value and it is not a map (that is,
+          self.type.is_scalar() returns True), then the value is:
+
+            * An int or long, for an integer column.
+
+            * An int or long or float, for a real column.
+
+            * A bool, for a boolean column.
+
+            * A str or unicode object, for a string column.
+
+            * A uuid.UUID object, for a UUID column without a ref_table.
+
+            * An object represented the referenced row, for a UUID column with
+              a ref_table.  (For the Idl, this object will be an ovs.db.idl.Row
+              object.)
+
+          If some error occurs (e.g. the database server's idea of the column
+          is different from the IDL's idea), then the default value for the
+          scalar type is used (see Atom.default()).
+
+        - Otherwise, if the type is not a map, then the value is a Python list
+          whose elements have the types described above.
+
+        - Otherwise, the type is a map, and the value is a Python dict that
+          maps from key to value, with key and value types determined as
+          described above.
+
+        'uuid_to_row' must be a function that takes a value and an
+        ovs.db.types.BaseType and translates UUIDs into row objects."""
+        if self.type.is_scalar():
+            value = uuid_to_row(self.as_scalar(), self.type.key)
+            if value is None:
+                return self.type.key.default()
+            else:
+                return value
+        elif self.type.is_map():
+            value = {}
+            for k, v in self.values.iteritems():
+                dk = uuid_to_row(k.value, self.type.key)
+                dv = uuid_to_row(v.value, self.type.value)
+                if dk is not None and dv is not None:
+                    value[dk] = dv
+            return value
+        else:
+            s = set()
+            for k in self.values:
+                dk = uuid_to_row(k.value, self.type.key)
+                if dk is not None:
+                    s.add(dk)
+            return sorted(s)
+
+    @staticmethod
+    def from_python(type_, value, row_to_uuid):
+        """Returns a new Datum with the given ovs.db.types.Type 'type_'.  The
+        new datum's value is taken from 'value', which must take the form
+        described as a valid return value from Datum.to_python() for 'type'.
+
+        Each scalar value within 'value' is initally passed through
+        'row_to_uuid', which should convert objects that represent rows (if
+        any) into uuid.UUID objects and return other data unchanged.
+
+        Raises ovs.db.error.Error if 'value' is not in an appropriate form for
+        'type_'."""
+        d = {}
+        if type(value) == dict:
+            for k, v in value.iteritems():
+                ka = Atom.from_python(type_.key, row_to_uuid(k))
+                va = Atom.from_python(type_.value, row_to_uuid(v))
+                d[ka] = va
+        elif type(value) in (list, tuple):
+            for k in value:
+                ka = Atom.from_python(type_.key, row_to_uuid(k))
+                d[ka] = None
+        else:
+            ka = Atom.from_python(type_.key, row_to_uuid(value))
+            d[ka] = None
+
+        datum = Datum(type_, d)
+        datum.check_constraints()
+        if not datum.conforms_to_type():
+            raise error.Error("%d values when type requires between %d and %d"
+                              % (len(d), type_.n_min, type_.n_max))
+
+        return datum
+
     def __getitem__(self, key):
         if not isinstance(key, Atom):
             key = Atom.new(key)
index 7841a89..d01fde8 100644 (file)
 # limitations under the License.
 
 import logging
+import uuid
 
 import ovs.jsonrpc
 import ovs.db.parser
 import ovs.db.schema
 from ovs.db import error
 import ovs.ovsuuid
+import ovs.poller
 
 class Idl:
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -33,36 +35,93 @@ class Idl:
     JSON-RPC requests and reports to the client whether the transaction
     completed successfully.
 
-    If 'schema_cb' is provided, it should be a callback function that accepts
-    an ovs.db.schema.DbSchema as its argument.  It should determine whether the
-    schema is acceptable and raise an ovs.db.error.Error if it is not.  It may
-    also delete any tables or columns from the schema that the client has no
-    interest in monitoring, to save time and bandwidth during monitoring.  Its
-    return value is ignored."""
+    The client is allowed to access the following attributes directly, in a
+    read-only fashion:
 
-    def __init__(self, remote, db_name, schema_cb=None):
+    - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
+      to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
+      annotated with a new attribute 'rows', which is a dict from a uuid.UUID
+      to a Row object.
+
+      The client may directly read and write the Row objects referenced by the
+      'rows' map values.  Refer to Row for more details.
+
+    - 'change_seqno': A number that represents the IDL's state.  When the IDL
+      is updated (by Idl.run()), its value changes.
+
+    - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
+      if no lock is configured.
+
+    - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
+      lock, and False otherwise.
+
+      Locking and unlocking happens asynchronously from the database client's
+      point of view, so the information is only useful for optimization
+      (e.g. if the client doesn't have the lock then there's no point in trying
+      to write to the database).
+
+    - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
+      the database server has indicated that some other client already owns the
+      requested lock, and False otherwise.
+
+    - 'txn': The ovs.db.idl.Transaction object for the database transaction
+      currently being constructed, if there is one, or None otherwise.
+"""
+
+    def __init__(self, remote, schema):
         """Creates and returns a connection to the database named 'db_name' on
         'remote', which should be in a form acceptable to
         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
-        replica of the remote database."""
-        self.remote = remote
-        self.session = ovs.jsonrpc.Session.open(remote)
-        self.db_name = db_name
-        self.last_seqno = None
-        self.schema = None
-        self.state = None
+        replica of the remote database.
+
+        'schema' should be the schema for the remote database.  The caller may
+        have cut it down by removing tables or columns that are not of
+        interest.  The IDL will only replicate the tables and columns that
+        remain.  The caller may also add a attribute named 'alert' to selected
+        remaining columns, setting its value to False; if so, then changes to
+        those columns will not be considered changes to the database for the
+        purpose of the return value of Idl.run() and Idl.change_seqno.  This is
+        useful for columns that the IDL's client will write but not read.
+
+        The IDL uses and modifies 'schema' directly."""
+
+        self.tables = schema.tables
+        self._db = schema
+        self._session = ovs.jsonrpc.Session.open(remote)
+        self._monitor_request_id = None
+        self._last_seqno = None
         self.change_seqno = 0
-        self.data = {}
-        self.schema_cb = schema_cb
+
+        # Database locking.
+        self.lock_name = None          # Name of lock we need, None if none.
+        self.has_lock = False          # Has db server said we have the lock?
+        self.is_lock_contended = False # Has db server said we can't get lock?
+        self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
+
+        # Transaction support.
+        self.txn = None
+        self._outstanding_txns = {}
+
+        for table in schema.tables.itervalues():
+            for column in table.columns.itervalues():
+                if not hasattr(column, 'alert'):
+                    column.alert = True
+            table.need_table = False
+            table.rows = {}
+            table.idl = self
 
     def close(self):
-        self.session.close()
+        """Closes the connection to the database.  The IDL will no longer
+        update."""
+        self._session.close()
 
     def run(self):
         """Processes a batch of messages from the database server.  Returns
         True if the database as seen through the IDL changed, False if it did
         not change.  The initial fetch of the entire contents of the remote
-        database is considered to be one kind of change.
+        database is considered to be one kind of change.  If the IDL has been
+        configured to acquire a database lock (with Idl.set_lock()), then
+        successfully acquiring the lock is also considered to be a change.
 
         This function can return occasional false positives, that is, report
         that the database changed even though it didn't.  This happens if the
@@ -73,110 +132,192 @@ class Idl:
         not supposed to do that.)
 
         As an alternative to checking the return value, the client may check
-        for changes in the value returned by self.get_seqno()."""
+        for changes in self.change_seqno."""
+        assert not self.txn
         initial_change_seqno = self.change_seqno
-        self.session.run()
-        if self.session.is_connected():
-            seqno = self.session.get_seqno()
-            if seqno != self.last_seqno:
-                self.last_seqno = seqno
-                self.state = (self.__send_schema_request, None)
-            if self.state:
-                self.state[0]()
+        self._session.run()
+        i = 0
+        while i < 50:
+            i += 1
+            if not self._session.is_connected():
+                break
+
+            seqno = self._session.get_seqno()
+            if seqno != self._last_seqno:
+                self._last_seqno = seqno
+                self.__txn_abort_all()
+                self.__send_monitor_request()
+                if self.lock_name:
+                    self.__send_lock_request()
+                break
+
+            msg = self._session.recv()
+            if msg is None:
+                break
+            if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                and msg.method == "update"
+                and len(msg.params) == 2
+                and msg.params[0] == None):
+                # Database contents changed.
+                self.__parse_update(msg.params[1])
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+                  and self._monitor_request_id is not None
+                  and self._monitor_request_id == msg.id):
+                # Reply to our "monitor" request.
+                try:
+                    self.change_seqno += 1
+                    self._monitor_request_id = None
+                    self.__clear()
+                    self.__parse_update(msg.result)
+                except error.Error, e:
+                    logging.error("%s: parse error in received schema: %s"
+                                  % (self._session.get_name(), e))
+                    self.__error()
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+                  and self._lock_request_id is not None
+                  and self._lock_request_id == msg.id):
+                # Reply to our "lock" request.
+                self.__parse_lock_reply(msg.result)
+            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                  and msg.method == "locked"):
+                # We got our lock.
+                self.__parse_lock_notify(msg.params, True)
+            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                  and msg.method == "stolen"):
+                # Someone else stole our lock.
+                self.__parse_lock_notify(msg.params, False)
+            elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
+                # Reply to our echo request.  Ignore it.
+                pass
+            elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
+                               ovs.jsonrpc.Message.T_REPLY)
+                  and self.__txn_process_reply(msg)):
+                # __txn_process_reply() did everything needed.
+                pass
+            else:
+                # This can happen if a transaction is destroyed before we
+                # receive the reply, so keep the log level low.
+                logging.debug("%s: received unexpected %s message"
+                              % (self._session.get_name(),
+                                 ovs.jsonrpc.Message.type_to_string(msg.type)))
+
         return initial_change_seqno != self.change_seqno
 
     def wait(self, poller):
         """Arranges for poller.block() to wake up when self.run() has something
         to do or when activity occurs on a transaction on 'self'."""
-        self.session.wait(poller)
-        if self.state and self.state[1]:
-            self.state[1](poller)
-
-    def get_seqno(self):
-        """Returns a number that represents the IDL's state.  When the IDL
-        updated (by self.run()), the return value changes."""
-        return self.change_seqno
-        
-    def __send_schema_request(self):
-        msg = ovs.jsonrpc.Message.create_request("get_schema", [self.db_name])
-        self.session.send(msg)
-        self.state = (lambda: self.__recv_schema(msg.id), self.__recv_wait)
-
-    def __recv_schema(self, id):
-        msg = self.session.recv()
-        if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
-            try:
-                self.schema = ovs.db.schema.DbSchema.from_json(msg.result)
-            except error.Error, e:
-                logging.error("%s: parse error in received schema: %s"
-                              % (self.remote, e))
-                self.__error()
-                return
+        self._session.wait(poller)
+        self._session.recv_wait(poller)
 
-            if self.schema_cb:
-                try:
-                    self.schema_cb(self.schema)
-                except error.Error, e:
-                    logging.error("%s: error validating schema: %s"
-                                  % (self.remote, e))
-                    self.__error()
-                    return
+    def has_ever_connected(self):
+        """Returns True, if the IDL successfully connected to the remote
+        database and retrieved its contents (even if the connection
+        subsequently dropped and is in the process of reconnecting).  If so,
+        then the IDL contains an atomic snapshot of the database's contents
+        (but it might be arbitrarily old if the connection dropped).
+
+        Returns False if the IDL has never connected or retrieved the
+        database's contents.  If so, the IDL is empty."""
+        return self.change_seqno != 0
+
+    def force_reconnect(self):
+        """Forces the IDL to drop its connection to the database and reconnect.
+        In the meantime, the contents of the IDL will not change."""
+        self._session.force_reconnect()
+
+    def set_lock(self, lock_name):
+        """If 'lock_name' is not None, configures the IDL to obtain the named
+        lock from the database server and to avoid modifying the database when
+        the lock cannot be acquired (that is, when another client has the same
+        lock).
+
+        If 'lock_name' is None, drops the locking requirement and releases the
+        lock."""
+        assert not self.txn
+        assert not self._outstanding_txns
+
+        if self.lock_name and (not lock_name or lock_name != self.lock_name):
+            # Release previous lock.
+            self.__send_unlock_request()
+            self.lock_name = None
+            self.is_lock_contended = False
+
+        if lock_name and not self.lock_name:
+            # Acquire new lock.
+            self.lock_name = lock_name
+            self.__send_lock_request()
+
+    def __clear(self):
+        changed = False
+
+        for table in self.tables.itervalues():
+            if table.rows:
+                changed = True
+                table.rows = {}
 
-            self.__send_monitor_request()
-        elif msg:
-            logging.error("%s: unexpected message expecting schema: %s"
-                          % (self.remote, msg))
-            self.__error()
-            
-    def __recv_wait(self, poller):
-        self.session.recv_wait(poller)
+        if changed:
+            self.change_seqno += 1
+
+    def __update_has_lock(self, new_has_lock):
+        if new_has_lock and not self.has_lock:
+            if self._monitor_request_id is None:
+                self.change_seqno += 1
+            else:
+                # We're waiting for a monitor reply, so don't signal that the
+                # database changed.  The monitor reply will increment
+                # change_seqno anyhow.
+                pass
+            self.is_lock_contended = False
+        self.has_lock = new_has_lock
+
+    def __do_send_lock_request(self, method):
+        self.__update_has_lock(False)
+        self._lock_request_id = None
+        if self._session.is_connected():
+            msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
+            msg_id = msg.id
+            self._session.send(msg)
+        else:
+            msg_id = None
+        return msg_id
+
+    def __send_lock_request(self):
+        self._lock_request_id = self.__do_send_lock_request("lock")
+
+    def __send_unlock_request(self):
+        self.__do_send_lock_request("unlock")
+
+    def __parse_lock_reply(self, result):
+        self._lock_request_id = None
+        got_lock = type(result) == dict and result.get("locked") is True
+        self.__update_has_lock(got_lock)
+        if not got_lock:
+            self.is_lock_contended = True
+
+    def __parse_lock_notify(self, params, new_has_lock):
+        if (self.lock_name is not None
+            and type(params) in (list, tuple)
+            and params
+            and params[0] == self.lock_name):
+            self.__update_has_lock(self, new_has_lock)
+            if not new_has_lock:
+                self.is_lock_contended = True
 
     def __send_monitor_request(self):
         monitor_requests = {}
-        for table in self.schema.tables.itervalues():
+        for table in self.tables.itervalues():
             monitor_requests[table.name] = {"columns": table.columns.keys()}
         msg = ovs.jsonrpc.Message.create_request(
-            "monitor", [self.db_name, None, monitor_requests])
-        self.session.send(msg)
-        self.state = (lambda: self.__recv_monitor_reply(msg.id),
-                      self.__recv_wait)
-
-    def __recv_monitor_reply(self, id):
-        msg = self.session.recv()
-        if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
-            try:
-                self.change_seqno += 1
-                self.state = (self.__recv_update, self.__recv_wait)
-                self.__clear()
-                self.__parse_update(msg.result)
-            except error.Error, e:
-                logging.error("%s: parse error in received schema: %s"
-                              % (self.remote, e))
-                self.__error()
-        elif msg:
-            logging.error("%s: unexpected message expecting schema: %s"
-                          % (self.remote, msg))
-            self.__error()
-
-    def __recv_update(self):
-        msg = self.session.recv()
-        if (msg and msg.type == ovs.jsonrpc.Message.T_NOTIFY and
-            type(msg.params) == list and len(msg.params) == 2 and
-            msg.params[0] is None):
-            self.__parse_update(msg.params[1])
-        elif msg:
-            logging.error("%s: unexpected message expecting update: %s"
-                          % (self.remote, msg))
-            self.__error()
-
-    def __error(self):
-        self.session.force_reconnect()
+            "monitor", [self._db.name, None, monitor_requests])
+        self._monitor_request_id = msg.id
+        self._session.send(msg)
 
     def __parse_update(self, update):
         try:
             self.__do_parse_update(update)
         except error.Error, e:
-            logging.error("%s: error parsing update: %s" % (self.remote, e))
+            logging.error("%s: error parsing update: %s"
+                          % (self._session.get_name(), e))
 
     def __do_parse_update(self, table_updates):
         if type(table_updates) != dict:
@@ -184,7 +325,7 @@ class Idl:
                               table_updates)
 
         for table_name, table_update in table_updates.iteritems():
-            table = self.schema.tables.get(table_name)
+            table = self.tables.get(table_name)
             if not table:
                 raise error.Error('<table-updates> includes unknown '
                                   'table "%s"' % table_name)
@@ -216,17 +357,17 @@ class Idl:
                     raise error.Error('<row-update> missing "old" and '
                                       '"new" members', row_update)
 
-                if self.__parse_row_update(table, uuid, old, new):
+                if self.__process_update(table, uuid, old, new):
                     self.change_seqno += 1
 
-    def __parse_row_update(self, table, uuid, old, new):
+    def __process_update(self, table, uuid, old, new):
         """Returns True if a column changed, False otherwise."""
-        row = self.data[table.name].get(uuid)
+        row = table.rows.get(uuid)
         changed = False
         if not new:
             # Delete row.
             if row:
-                del self.data[table.name][uuid]
+                del table.rows[uuid]
                 changed = True
             else:
                 # XXX rate-limit
@@ -241,7 +382,7 @@ class Idl:
                 # XXX rate-limit
                 logging.warning("cannot add existing row %s to table %s"
                                 % (uuid, table.name))
-            if self.__modify_row(table, row, new):
+            if self.__row_update(table, row, new):
                 changed = True
         else:
             if not row:
@@ -250,11 +391,11 @@ class Idl:
                 # XXX rate-limit
                 logging.warning("cannot modify missing row %s in table %s"
                                 % (uuid, table.name))
-            if self.__modify_row(table, row, new):
+            if self.__row_update(table, row, new):
                 changed = True
         return changed
 
-    def __modify_row(self, table, row, row_json):
+    def __row_update(self, table, row, row_json):
         changed = False
         for column_name, datum_json in row_json.iteritems():
             column = table.columns.get(column_name)
@@ -272,40 +413,641 @@ class Idl:
                                 % (column_name, table.name, e))
                 continue
 
-            if datum != getattr(row, column_name):
-                setattr(row, column_name, datum)
-                changed = True
+            if datum != row._data[column_name]:
+                row._data[column_name] = datum
+                if column.alert:
+                    changed = True
             else:
                 # Didn't really change but the OVSDB monitor protocol always
                 # includes every value in a row.
                 pass
         return changed
 
-    def __clear(self):
-        if self.data != {}:
-            for table_name in self.schema.tables:
-                if self.data[table_name] != {}:
-                    self.change_seqno += 1
-                    break
-
-        self.data = {}
-        for table_name in self.schema.tables:
-            self.data[table_name] = {}
-
     def __create_row(self, table, uuid):
-        row = self.data[table.name][uuid] = Row()
+        data = {}
         for column in table.columns.itervalues():
-            setattr(row, column.name, ovs.db.data.Datum.default(column.type))
+            data[column.name] = ovs.db.data.Datum.default(column.type)
+        row = table.rows[uuid] = Row(self, table, uuid, data)
         return row
 
-    def force_reconnect(self):
-        """Forces the IDL to drop its connection to the database and reconnect.
-        In the meantime, the contents of the IDL will not change."""
-        self.session.force_reconnect()
+    def __error(self):
+        self._session.force_reconnect()
+
+    def __txn_abort_all(self):
+        while self._outstanding_txns:
+            txn = self._outstanding_txns.popitem()[1]
+            txn._status = Transaction.TRY_AGAIN
+
+    def __txn_process_reply(self, msg):
+        txn = self._outstanding_txns.pop(msg.id, None)
+        if txn:
+            txn._process_reply(msg)
+
+def _uuid_to_row(atom, base):
+    if base.ref_table:
+        return base.ref_table.rows.get(atom)
+    else:
+        return atom
+
+def _row_to_uuid(value):
+    if type(value) == Row:
+        return value.uuid
+    else:
+        return value
 
 class Row(object):
-    """A row within an Idl.  Data for each column is stored as an attribute
-    with the same name as the column and using an ovs.db.data.Datum as the
-    value."""
-    pass
+    """A row within an IDL.
+
+    The client may access the following attributes directly:
+
+    - 'uuid': a uuid.UUID object whose value is the row's database UUID.
+
+    - An attribute for each column in the Row's table, named for the column,
+      whose values are as returned by Datum.to_python() for the column's type.
+
+      If some error occurs (e.g. the database server's idea of the column is
+      different from the IDL's idea), then the attribute values is the
+      "default" value return by Datum.default() for the column's type.  (It is
+      important to know this because the default value may violate constraints
+      for the column's type, e.g. the default integer value is 0 even if column
+      contraints require the column's value to be positive.)
+
+      When a transaction is active, column attributes may also be assigned new
+      values.  Committing the transaction will then cause the new value to be
+      stored into the database.
+
+      *NOTE*: In the current implementation, the value of a column is a *copy*
+      of the value in the database.  This means that modifying its value
+      directly will have no useful effect.  For example, the following:
+        row.mycolumn["a"] = "b"              # don't do this
+      will not change anything in the database, even after commit.  To modify
+      the column, instead assign the modified column value back to the column:
+        d = row.mycolumn
+        d["a"] = "b"
+        row.mycolumn = d
+"""
+    def __init__(self, idl, table, uuid, data):
+        # All of the explicit references to self.__dict__ below are required
+        # to set real attributes with invoking self.__getattr__().
+        self.__dict__["uuid"] = uuid
+
+        self.__dict__["_idl"] = idl
+        self.__dict__["_table"] = table
+
+        # _data is the committed data.  It takes the following values:
+        #
+        #   - A dictionary that maps every column name to a Datum, if the row
+        #     exists in the committed form of the database.
+        #
+        #   - None, if this row is newly inserted within the active transaction
+        #     and thus has no committed form.
+        self.__dict__["_data"] = data
+
+        # _changes describes changes to this row within the active transaction.
+        # It takes the following values:
+        #
+        #   - {}, the empty dictionary, if no transaction is active or if the
+        #     row has yet not been changed within this transaction.
+        #
+        #   - A dictionary that maps a column name to its new Datum, if an
+        #     active transaction changes those columns' values.
+        #
+        #   - A dictionary that maps every column name to a Datum, if the row
+        #     is newly inserted within the active transaction.
+        #
+        #   - None, if this transaction deletes this row.
+        self.__dict__["_changes"] = {}
+
+        # A dictionary whose keys are the names of columns that must be
+        # verified as prerequisites when the transaction commits.  The values
+        # in the dictionary are all None.
+        self.__dict__["_prereqs"] = {}
+
+    def __getattr__(self, column_name):
+        assert self._changes is not None
+
+        datum = self._changes.get(column_name)
+        if datum is None:
+            datum = self._data[column_name]
+
+        return datum.to_python(_uuid_to_row)
+
+    def __setattr__(self, column_name, value):
+        assert self._changes is not None
+        assert self._idl.txn
+
+        column = self._table.columns[column_name]
+        try:
+            datum = ovs.db.data.Datum.from_python(column.type, value,
+                                                  _row_to_uuid)
+        except error.Error, e:
+            # XXX rate-limit
+            logging.error("attempting to write bad value to column %s (%s)"
+                          % (column_name, e))
+            return
+        self._idl.txn._write(self, column, datum)
+
+    def verify(self, column_name):
+        """Causes the original contents of column 'column_name' in this row to
+        be verified as a prerequisite to completing the transaction.  That is,
+        if 'column_name' changed in this row (or if this row was deleted)
+        between the time that the IDL originally read its contents and the time
+        that the transaction commits, then the transaction aborts and
+        Transaction.commit() returns Transaction.TRY_AGAIN.
+
+        The intention is that, to ensure that no transaction commits based on
+        dirty reads, an application should call Row.verify() on each data item
+        read as part of a read-modify-write operation.
+
+        In some cases Row.verify() reduces to a no-op, because the current
+        value of the column is already known:
+
+          - If this row is a row created by the current transaction (returned
+            by Transaction.insert()).
+
+          - If the column has already been modified within the current
+            transaction.
+
+        Because of the latter property, always call Row.verify() *before*
+        modifying the column, for a given read-modify-write.
+
+        A transaction must be in progress."""
+        assert self._idl.txn
+        assert self._changes is not None
+        if not self._data or column_name in self._changes:
+            return
+
+        self._prereqs[column_name] = None
+
+    def delete(self):
+        """Deletes this row from its table.
+
+        A transaction must be in progress."""
+        assert self._idl.txn
+        assert self._changes is not None
+        if self._data is None:
+            del self._idl.txn._txn_rows[self.uuid]
+        self.__dict__["_changes"] = None
+        del self._table.rows[self.uuid]
+
+def _uuid_name_from_uuid(uuid):
+    return "row%s" % str(uuid).replace("-", "_")
+
+def _where_uuid_equals(uuid):
+    return [["_uuid", "==", ["uuid", str(uuid)]]]
+
+class _InsertedRow(object):
+    def __init__(self, op_index):
+        self.op_index = op_index
+        self.real = None
+
+class Transaction(object):
+    # Status values that Transaction.commit() can return.
+    UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
+    UNCHANGED = "unchanged"     # Transaction didn't include any changes.
+    INCOMPLETE = "incomplete"   # Commit in progress, please wait.
+    ABORTED = "aborted"         # ovsdb_idl_txn_abort() called.
+    SUCCESS = "success"         # Commit successful.
+    TRY_AGAIN = "try again"     # Commit failed because a "verify" operation
+                                # reported an inconsistency, due to a network
+                                # problem, or other transient failure.
+    NOT_LOCKED = "not locked"   # Server hasn't given us the lock yet.
+    ERROR = "error"             # Commit failed due to a hard error.
+
+    @staticmethod
+    def status_to_string(status):
+        """Converts one of the status values that Transaction.commit() can
+        return into a human-readable string.
+
+        (The status values are in fact such strings already, so
+        there's nothing to do.)"""
+        return status
+
+    def __init__(self, idl):
+        """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
+        A given Idl may only have a single active transaction at a time.
+
+        A Transaction may modify the contents of a database by assigning new
+        values to columns (attributes of Row), deleting rows (with
+        Row.delete()), or inserting rows (with Transaction.insert()).  It may
+        also check that columns in the database have not changed with
+        Row.verify().
+
+        When a transaction is complete (which must be before the next call to
+        Idl.run()), call Transaction.commit() or Transaction.abort()."""
+        assert idl.txn is None
+
+        idl.txn = self
+        self._request_id = None
+        self.idl = idl
+        self.dry_run = False
+        self._txn_rows = {}
+        self._status = Transaction.UNCOMMITTED
+        self._error = None
+        self._comments = []
+
+        self._inc_table = None
+        self._inc_column = None
+        self._inc_where = None
+
+        self._inserted_rows = {} # Map from UUID to _InsertedRow
+
+    def add_comment(self, comment):
+        """Appens 'comment' to the comments that will be passed to the OVSDB
+        server when this transaction is committed.  (The comment will be
+        committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
+        relatively human-readable form.)"""
+        self._comments.append(comment)
+
+    def increment(self, table, column, where):
+        assert not self._inc_table
+        self._inc_table = table
+        self._inc_column = column
+        self._inc_where = where
+
+    def wait(self, poller):
+        if self._status not in (Transaction.UNCOMMITTED,
+                                Transaction.INCOMPLETE):
+            poller.immediate_wake()
+
+    def _substitute_uuids(self, json):
+        if type(json) in (list, tuple):
+            if (len(json) == 2
+                and json[0] == 'uuid'
+                and ovs.ovsuuid.is_valid_string(json[1])):
+                uuid = ovs.ovsuuid.from_string(json[1])
+                row = self._txn_rows.get(uuid, None)
+                if row and row._data is None:
+                    return ["named-uuid", _uuid_name_from_uuid(uuid)]
+        return json
+
+    def __disassemble(self):
+        self.idl.txn = None
+
+        for row in self._txn_rows.itervalues():
+            if row._changes is None:
+                row._table.rows[row.uuid] = row
+            elif row._data is None:
+                del row._table.rows[row.uuid]
+            row.__dict__["_changes"] = {}
+            row.__dict__["_prereqs"] = {}
+        self._txn_rows = {}
+
+    def commit(self):
+        """Attempts to commit this transaction and returns the status of the
+        commit operation, one of the constants declared as class attributes.
+        If the return value is Transaction.INCOMPLETE, then the transaction is
+        not yet complete and the caller should try calling again later, after
+        calling Idl.run() to run the Idl.
+
+        Committing a transaction rolls back all of the changes that it made to
+        the Idl's copy of the database.  If the transaction commits
+        successfully, then the database server will send an update and, thus,
+        the Idl will be updated with the committed changes."""
+        # The status can only change if we're the active transaction.
+        # (Otherwise, our status will change only in Idl.run().)
+        if self != self.idl.txn:
+            return self._status
+
+        # If we need a lock but don't have it, give up quickly.
+        if self.idl.lock_name and not self.idl.has_lock():
+            self._status = Transaction.NOT_LOCKED
+            self.__disassemble()
+            return self._status
+
+        operations = [self.idl._db.name]
+
+        # Assert that we have the required lock (avoiding a race).
+        if self.idl.lock_name:
+            operations.append({"op": "assert",
+                               "lock": self.idl.lock_name})
+
+        # Add prerequisites and declarations of new rows.
+        for row in self._txn_rows.itervalues():
+            if row._prereqs:
+                rows = {}
+                columns = []
+                for column_name in row._prereqs:
+                    columns.append(column_name)
+                    rows[column_name] = row._data[column_name].to_json()
+                operations.append({"op": "wait",
+                                   "table": row._table.name,
+                                   "timeout": 0,
+                                   "where": _where_uuid_equals(row.uuid),
+                                   "until": "==",
+                                   "columns": columns,
+                                   "rows": [rows]})
+
+        # Add updates.
+        any_updates = False
+        for row in self._txn_rows.itervalues():
+            if row._changes is None:
+                if row._table.is_root:
+                    operations.append({"op": "delete",
+                                       "table": row._table.name,
+                                       "where": _where_uuid_equals(row.uuid)})
+                    any_updates = True
+                else:
+                    # Let ovsdb-server decide whether to really delete it.
+                    pass
+            elif row._changes:
+                op = {"table": row._table.name}
+                if row._data is None:
+                    op["op"] = "insert"
+                    op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
+                    any_updates = True
+
+                    op_index = len(operations) - 1
+                    self._inserted_rows[row.uuid] = _InsertedRow(op_index)
+                else:
+                    op["op"] = "update"
+                    op["where"] = _where_uuid_equals(row.uuid)
+
+                row_json = {}
+                op["row"] = row_json
+
+                for column_name, datum in row._changes.iteritems():
+                    if row._data is not None or not datum.is_default():
+                        row_json[column_name] = self._substitute_uuids(datum.to_json())
+
+                        # If anything really changed, consider it an update.
+                        # We can't suppress not-really-changed values earlier
+                        # or transactions would become nonatomic (see the big
+                        # comment inside Transaction._write()).
+                        if (not any_updates and row._data is not None and
+                            row._data[column_name] != datum):
+                            any_updates = True
+
+                if row._data is None or row_json:
+                    operations.append(op)
+
+        # Add increment.
+        if self._inc_table and any_updates:
+            self._inc_index = len(operations) - 1
+
+            operations.append({"op": "mutate",
+                               "table": self._inc_table,
+                               "where": self._substitute_uuids(self._inc_where),
+                               "mutations": [[self._inc_column, "+=", 1]]})
+            operations.append({"op": "select",
+                               "table": self._inc_table,
+                               "where": self._substitute_uuids(self._inc_where),
+                               "columns": [self._inc_column]})
+
+        # Add comment.
+        if self._comments:
+            operations.append({"op": "comment",
+                               "comment": "\n".join(self._comments)})
+
+        # Dry run?
+        if self.dry_run:
+            operations.append({"op": "abort"})
+
+        if not any_updates:
+            self._status = Transaction.UNCHANGED
+        else:
+            msg = ovs.jsonrpc.Message.create_request("transact", operations)
+            self._request_id = msg.id
+            if not self.idl._session.send(msg):
+                self.idl._outstanding_txns[self._request_id] = self
+                self._status = Transaction.INCOMPLETE
+            else:
+                self._status = Transaction.TRY_AGAIN
+
+        self.__disassemble()
+        return self._status
+
+    def commit_block(self):
+        while True:
+            status = self.commit()
+            if status != Transaction.INCOMPLETE:
+                return status
+
+            self.idl.run()
+
+            poller = ovs.poller.Poller()
+            self.idl.wait(poller)
+            self.wait(poller)
+            poller.block()
+
+    def get_increment_new_value(self):
+        assert self._status == Transaction.SUCCESS
+        return self._inc_new_value
+
+    def abort(self):
+        """Aborts this transaction.  If Transaction.commit() has already been
+        called then the transaction might get committed anyhow."""
+        self.__disassemble()
+        if self._status in (Transaction.UNCOMMITTED,
+                            Transaction.INCOMPLETE):
+            self._status = Transaction.ABORTED
+
+    def get_error(self):
+        """Returns a string representing this transaction's current status,
+        suitable for use in log messages."""
+        if self._status != Transaction.ERROR:
+            return Transaction.status_to_string(self._status)
+        elif self._error:
+            return self._error
+        else:
+            return "no error details available"
+
+    def __set_error_json(self, json):
+        if self._error is None:
+            self._error = ovs.json.to_string(json)
+
+    def get_insert_uuid(self, uuid):
+        """Finds and returns the permanent UUID that the database assigned to a
+        newly inserted row, given the UUID that Transaction.insert() assigned
+        locally to that row.
+
+        Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
+        or if it was assigned by that function and then deleted by Row.delete()
+        within the same transaction.  (Rows that are inserted and then deleted
+        within a single transaction are never sent to the database server, so
+        it never assigns them a permanent UUID.)
+
+        This transaction must have completed successfully."""
+        assert self._status in (Transaction.SUCCESS,
+                                Transaction.UNCHANGED)
+        inserted_row = self._inserted_rows.get(uuid)
+        if inserted_row:
+            return inserted_row.real
+        return None
+
+    def _write(self, row, column, datum):
+        assert row._changes is not None
+
+        txn = row._idl.txn
+
+        # If this is a write-only column and the datum being written is the
+        # same as the one already there, just skip the update entirely.  This
+        # is worth optimizing because we have a lot of columns that get
+        # periodically refreshed into the database but don't actually change
+        # that often.
+        #
+        # We don't do this for read/write columns because that would break
+        # atomicity of transactions--some other client might have written a
+        # different value in that column since we read it.  (But if a whole
+        # transaction only does writes of existing values, without making any
+        # real changes, we will drop the whole transaction later in
+        # ovsdb_idl_txn_commit().)
+        if not column.alert and row._data.get(column.name) == datum:
+            new_value = row._changes.get(column.name)
+            if new_value is None or new_value == datum:
+                return
+
+        txn._txn_rows[row.uuid] = row
+        row._changes[column.name] = datum.copy()
+
+    def insert(self, table, new_uuid=None):
+        """Inserts and returns a new row in 'table', which must be one of the
+        ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
+
+        The new row is assigned a provisional UUID.  If 'uuid' is None then one
+        is randomly generated; otherwise 'uuid' should specify a randomly
+        generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
+        different UUID when 'txn' is committed, but the IDL will replace any
+        uses of the provisional UUID in the data to be to be committed by the
+        UUID assigned by ovsdb-server."""
+        assert self._status == Transaction.UNCOMMITTED
+        if new_uuid is None:
+            new_uuid = uuid.uuid4()
+        row = Row(self.idl, table, new_uuid, None)
+        table.rows[row.uuid] = row
+        self._txn_rows[row.uuid] = row
+        return row
+
+    def _process_reply(self, msg):
+        if msg.type == ovs.jsonrpc.Message.T_ERROR:
+            self._status = Transaction.ERROR
+        elif type(msg.result) not in (list, tuple):
+            # XXX rate-limit
+            logging.warning('reply to "transact" is not JSON array')
+        else:
+            hard_errors = False
+            soft_errors = False
+            lock_errors = False
+
+            ops = msg.result
+            for op in ops:
+                if op is None:
+                    # This isn't an error in itself but indicates that some
+                    # prior operation failed, so make sure that we know about
+                    # it.
+                    soft_errors = True
+                elif type(op) == dict:
+                    error = op.get("error")
+                    if error is not None:
+                        if error == "timed out":
+                            soft_errors = True
+                        elif error == "not owner":
+                            lock_errors = True
+                        elif error == "aborted":
+                            pass
+                        else:
+                            hard_errors = True
+                            self.__set_error_json(op)
+                else:
+                    hard_errors = True
+                    self.__set_error_json(op)
+                    # XXX rate-limit
+                    logging.warning("operation reply is not JSON null or "
+                                    "object")
+
+            if not soft_errors and not hard_errors and not lock_errors:
+                if self._inc_table and not self.__process_inc_reply(ops):
+                    hard_errors = True
+
+                for insert in self._inserted_rows.itervalues():
+                    if not self.__process_insert_reply(insert, ops):
+                        hard_errors = True
+
+            if hard_errors:
+                self._status = Transaction.ERROR
+            elif lock_errors:
+                self._status = Transaction.NOT_LOCKED
+            elif soft_errors:
+                self._status = Transaction.TRY_AGAIN
+            else:
+                self._status = Transaction.SUCCESS
+
+    @staticmethod
+    def __check_json_type(json, types, name):
+        if not json:
+            # XXX rate-limit
+            logging.warning("%s is missing" % name)
+            return False
+        elif type(json) not in types:
+            # XXX rate-limit
+            logging.warning("%s has unexpected type %s" % (name, type(json)))
+            return False
+        else:
+            return True
+
+    def __process_inc_reply(self, ops):
+        if self._inc_index + 2 > len(ops):
+            # XXX rate-limit
+            logging.warning("reply does not contain enough operations for "
+                            "increment (has %d, needs %d)" %
+                            (len(ops), self._inc_index + 2))
+
+        # We know that this is a JSON object because the loop in
+        # __process_reply() already checked.
+        mutate = ops[self._inc_index]
+        count = mutate.get("count")
+        if not Transaction.__check_json_type(count, (int, long),
+                                             '"mutate" reply "count"'):
+            return False
+        if count != 1:
+            # XXX rate-limit
+            logging.warning('"mutate" reply "count" is %d instead of 1'
+                            % count)
+            return False
+
+        select = ops[self._inc_index + 1]
+        rows = select.get("rows")
+        if not Transaction.__check_json_type(rows, (list, tuple),
+                                             '"select" reply "rows"'):
+            return False
+        if len(rows) != 1:
+            # XXX rate-limit
+            logging.warning('"select" reply "rows" has %d elements '
+                            'instead of 1' % len(rows))
+            return False
+        row = rows[0]
+        if not Transaction.__check_json_type(row, (dict,),
+                                             '"select" reply row'):
+            return False
+        column = row.get(self._inc_column)
+        if not Transaction.__check_json_type(column, (int, long),
+                                             '"select" reply inc column'):
+            return False
+        self._inc_new_value = column
+        return True
+
+    def __process_insert_reply(self, insert, ops):
+        if insert.op_index >= len(ops):
+            # XXX rate-limit
+            logging.warning("reply does not contain enough operations "
+                            "for insert (has %d, needs %d)"
+                            % (len(ops), insert.op_index))
+            return False
+
+        # We know that this is a JSON object because the loop in
+        # __process_reply() already checked.
+        reply = ops[insert.op_index]
+        json_uuid = reply.get("uuid")
+        if not Transaction.__check_json_type(json_uuid, (tuple, list),
+                                             '"insert" reply "uuid"'):
+            return False
+
+        try:
+            uuid_ = ovs.ovsuuid.from_json(json_uuid)
+        except error.Error:
+            # XXX rate-limit
+            logging.warning('"insert" reply "uuid" is not a JSON UUID')
+            return False
 
+        insert.real = uuid_
+        return True
index 7e18564..675f4ec 100644 (file)
@@ -95,6 +95,9 @@ class DbSchema(object):
             json["version"] = self.version
         return json
 
+    def copy(self):
+        return DbSchema.from_json(self.to_json())
+
     def __follow_ref_table(self, column, base, base_name):
         if not base or base.type != types.UuidType or not base.ref_table_name:
             return
index f313186..6050197 100644 (file)
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import sys
+import uuid
 
 from ovs.db import error
 import ovs.db.parser
@@ -20,9 +21,10 @@ import ovs.db.data
 import ovs.ovsuuid
 
 class AtomicType(object):
-    def __init__(self, name, default):
+    def __init__(self, name, default, python_types):
         self.name = name
         self.default = default
+        self.python_types = python_types
 
     @staticmethod
     def from_string(s):
@@ -51,12 +53,12 @@ class AtomicType(object):
     def default_atom(self):
         return ovs.db.data.Atom(self, self.default)
 
-VoidType = AtomicType("void", None)
-IntegerType = AtomicType("integer", 0)
-RealType = AtomicType("real", 0.0)
-BooleanType = AtomicType("boolean", False)
-StringType = AtomicType("string", "")
-UuidType = AtomicType("uuid", ovs.ovsuuid.zero())
+VoidType = AtomicType("void", None, ())
+IntegerType = AtomicType("integer", 0, (int, long))
+RealType = AtomicType("real", 0.0, (int, long, float))
+BooleanType = AtomicType("boolean", False, (bool,))
+StringType = AtomicType("string", "", (str, unicode))
+UuidType = AtomicType("uuid", ovs.ovsuuid.zero(), (uuid.UUID,))
 
 ATOMIC_TYPES = [VoidType, IntegerType, RealType, BooleanType, StringType,
                 UuidType]
index 9c9f654..0166f0f 100644 (file)
@@ -4,7 +4,7 @@ HAVE_PYTHON='@HAVE_PYTHON@'
 PERL='@PERL@'
 PYTHON='@PYTHON@'
 
-PYTHONPATH=$PYTHONPATH:$abs_top_srcdir/python
+PYTHONPATH=$PYTHONPATH:$abs_top_srcdir/python:$abs_top_builddir/tests
 export PYTHONPATH
 
 PYTHONIOENCODING=utf_8
index 0e135f0..9b094de 100644 (file)
@@ -50,7 +50,6 @@ TESTSUITE_AT = \
        tests/ovsdb-server.at \
        tests/ovsdb-monitor.at \
        tests/ovsdb-idl.at \
-       tests/ovsdb-idl-py.at \
        tests/ovs-vsctl.at \
        tests/interface-reconfigure.at
 TESTSUITE = $(srcdir)/tests/testsuite
@@ -279,15 +278,22 @@ EXTRA_DIST += tests/uuidfilt.pl tests/ovsdb-monitor-sort.pl
 tests_test_ovsdb_LDADD = ovsdb/libovsdb.a lib/libopenvswitch.a $(SSL_LIBS)
 
 # idltest schema and IDL
-OVSIDL_BUILT +=        tests/idltest.c tests/idltest.h tests/idltest.ovsidl
+OVSIDL_BUILT += \
+       tests/idltest.c \
+       tests/idltest.h \
+       tests/idltest.ovsidl \
+       tests/idltest.py
 IDLTEST_IDL_FILES = tests/idltest.ovsschema tests/idltest.ann
 EXTRA_DIST += $(IDLTEST_IDL_FILES)
+CLEANFILES += tests/idltest.pyc tests/idltest.pyo
 tests/idltest.ovsidl: $(IDLTEST_IDL_FILES)
        $(OVSDB_IDLC) -C $(srcdir) annotate $(IDLTEST_IDL_FILES) > $@.tmp
        mv $@.tmp $@
 
 tests/idltest.c: tests/idltest.h
 
+noinst_SCRIPTS += tests/idltest.py
+
 noinst_PROGRAMS += tests/test-reconnect
 tests_test_reconnect_SOURCES = tests/test-reconnect.c
 tests_test_reconnect_LDADD = lib/libopenvswitch.a
diff --git a/tests/ovsdb-idl-py.at b/tests/ovsdb-idl-py.at
deleted file mode 100644 (file)
index bbee0c3..0000000
+++ /dev/null
@@ -1,149 +0,0 @@
-AT_BANNER([OVSDB -- interface description language (IDL) - Python])
-
-# OVSDB_CHECK_IDL(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
-#                 [FILTER])
-#
-# Creates a database with a schema derived from idltest.ovsidl, runs
-# each PRE-IDL-TXN (if any), starts an ovsdb-server on that database,
-# and runs "test-ovsdb idl" passing each of the TRANSACTIONS along.
-#
-# Checks that the overall output is OUTPUT.  Before comparison, the
-# output is sorted (using "sort") and UUIDs in the output are replaced
-# by markers of the form <N> where N is a number.  The first unique
-# UUID is replaced by <0>, the next by <1>, and so on.  If a given
-# UUID appears more than once it is always replaced by the same
-# marker.  If FILTER is supplied then the output is also filtered
-# through the specified program.
-#
-# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
-m4_define([OVSDB_CHECK_IDL_PY], 
-  [AT_SETUP([$1])
-   AT_SKIP_IF([test $HAVE_PYTHON = no])
-   AT_KEYWORDS([ovsdb server idl positive Python $5])
-   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
-                  [0], [stdout], [ignore])
-   AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --pidfile=$PWD/pid --remote=punix:socket --unixctl=$PWD/unixctl db], [0], [ignore], [ignore])
-   m4_if([$2], [], [],
-     [AT_CHECK([ovsdb-client transact unix:socket $2], [0], [ignore], [ignore], [kill `cat pid`])])
-   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py  -t10 idl unix:socket $3], 
-            [0], [stdout], [ignore], [kill `cat pid`])
-   AT_CHECK([sort stdout | perl $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]),
-            [0], [$4], [], [kill `cat pid`])
-   OVSDB_SERVER_SHUTDOWN
-   AT_CLEANUP])
-
-OVSDB_CHECK_IDL_PY([simple idl, initially empty, no ops - Python],
-  [],
-  [],
-  [000: empty
-001: done
-])
-
-OVSDB_CHECK_IDL_PY([simple idl, initially empty, various ops - Python],
-  [],
-  [['["idltest",
-      {"op": "insert",
-       "table": "simple",
-       "row": {"i": 1,
-               "r": 2.0,
-               "b": true,
-               "s": "mystring",
-               "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
-               "ia": ["set", [1, 2, 3]],
-               "ra": ["set", [-0.5]],
-               "ba": ["set", [true]],
-               "sa": ["set", ["abc", "def"]], 
-               "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
-                              ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
-      {"op": "insert",
-       "table": "simple",
-       "row": {}}]' \
-    '["idltest",
-      {"op": "update",
-       "table": "simple",
-       "where": [],
-       "row": {"b": true}}]' \
-    '["idltest",
-      {"op": "update",
-       "table": "simple",
-       "where": [],
-       "row": {"r": 123.5}}]' \
-    '["idltest",
-      {"op": "insert",
-       "table": "simple",
-       "row": {"i": -1,
-               "r": 125,
-               "b": false,
-               "s": "",
-               "ia": ["set", [1]],
-               "ra": ["set", [1.5]],
-               "ba": ["set", [false]],
-               "sa": ["set", []], 
-               "ua": ["set", []]}}]' \
-    '["idltest",
-      {"op": "update",
-       "table": "simple",
-       "where": [["i", "<", 1]],
-       "row": {"s": "newstring"}}]' \
-    '["idltest",
-      {"op": "delete",
-       "table": "simple",
-       "where": [["i", "==", 0]]}]' \
-    'reconnect']],
-  [[000: empty
-001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
-002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<4> <5>] uuid=<0>
-003: {"error":null,"result":[{"count":2}]}
-004: i=0 r=0 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-004: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<4> <5>] uuid=<0>
-005: {"error":null,"result":[{"count":2}]}
-006: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-006: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<4> <5>] uuid=<0>
-007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
-008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=false sa=[] ua=[] uuid=<6>
-008: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-008: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<4> <5>] uuid=<0>
-009: {"error":null,"result":[{"count":2}]}
-010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=false sa=[] ua=[] uuid=<6>
-010: i=0 r=123.5 b=true s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-010: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<4> <5>] uuid=<0>
-011: {"error":null,"result":[{"count":1}]}
-012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=false sa=[] ua=[] uuid=<6>
-012: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<4> <5>] uuid=<0>
-013: reconnect
-014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=false sa=[] ua=[] uuid=<6>
-014: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<4> <5>] uuid=<0>
-015: done
-]])
-
-OVSDB_CHECK_IDL_PY([simple idl, initially populated - Python],
-  [['["idltest",
-      {"op": "insert",
-       "table": "simple",
-       "row": {"i": 1,
-               "r": 2.0,
-               "b": true,
-               "s": "mystring",
-               "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
-               "ia": ["set", [1, 2, 3]],
-               "ra": ["set", [-0.5]],
-               "ba": ["set", [true]],
-               "sa": ["set", ["abc", "def"]], 
-               "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
-                              ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
-      {"op": "insert",
-       "table": "simple",
-       "row": {}}]']],
-  [['["idltest",
-      {"op": "update",
-       "table": "simple",
-       "where": [],
-       "row": {"b": true}}]']],
-  [[000: i=0 r=0 b=false s= u=<0> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-000: i=1 r=2 b=true s=mystring u=<2> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<3> <4>] uuid=<5>
-001: {"error":null,"result":[{"count":2}]}
-002: i=0 r=0 b=true s= u=<0> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-002: i=1 r=2 b=true s=mystring u=<2> ia=[1 2 3] ra=[-0.5] ba=true sa=[abc def] ua=[<3> <4>] uuid=<5>
-003: done
-]])
index f9c8286..cce6197 100644 (file)
@@ -1,7 +1,7 @@
 AT_BANNER([OVSDB -- interface description language (IDL)])
 
-# OVSDB_CHECK_IDL(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
-#                 [FILTER])
+# OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
+#                   [FILTER])
 #
 # Creates a database with a schema derived from idltest.ovsidl, runs
 # each PRE-IDL-TXN (if any), starts an ovsdb-server on that database,
@@ -16,21 +16,42 @@ AT_BANNER([OVSDB -- interface description language (IDL)])
 # through the specified program.
 #
 # TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
-m4_define([OVSDB_CHECK_IDL], 
-  [AT_SETUP([$1])
+m4_define([OVSDB_CHECK_IDL_C],
+  [AT_SETUP([$1 - C])
    AT_KEYWORDS([ovsdb server idl positive $5])
    AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
                   [0], [stdout], [ignore])
    AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --pidfile=$PWD/pid --remote=punix:socket --unixctl=$PWD/unixctl db], [0], [ignore], [ignore])
    m4_if([$2], [], [],
      [AT_CHECK([ovsdb-client transact unix:socket $2], [0], [ignore], [ignore], [kill `cat pid`])])
-   AT_CHECK([test-ovsdb '-vPATTERN:console:test-ovsdb|%c|%m' -vjsonrpc -t10 idl unix:socket $3], 
+   AT_CHECK([test-ovsdb '-vPATTERN:console:test-ovsdb|%c|%m' -vjsonrpc -t10 idl unix:socket $3],
             [0], [stdout], [ignore], [kill `cat pid`])
    AT_CHECK([sort stdout | perl $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]),
             [0], [$4], [], [kill `cat pid`])
    OVSDB_SERVER_SHUTDOWN
    AT_CLEANUP])
 
+# same as OVSDB_CHECK_IDL but uses the Python IDL implementation.
+m4_define([OVSDB_CHECK_IDL_PY],
+  [AT_SETUP([$1 - Python])
+   AT_SKIP_IF([test $HAVE_PYTHON = no])
+   AT_KEYWORDS([ovsdb server idl positive Python $5])
+   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
+                  [0], [stdout], [ignore])
+   AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --pidfile=$PWD/pid --remote=punix:socket --unixctl=$PWD/unixctl db], [0], [ignore], [ignore])
+   m4_if([$2], [], [],
+     [AT_CHECK([ovsdb-client transact unix:socket $2], [0], [ignore], [ignore], [kill `cat pid`])])
+   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py  -t10 idl $srcdir/idltest.ovsschema unix:socket $3],
+            [0], [stdout], [ignore], [kill `cat pid`])
+   AT_CHECK([sort stdout | perl $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]),
+            [0], [$4], [], [kill `cat pid`])
+   OVSDB_SERVER_SHUTDOWN
+   AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL],
+  [OVSDB_CHECK_IDL_C($@)
+   OVSDB_CHECK_IDL_PY($@)])
+
 OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
   [],
   [],
@@ -51,7 +72,7 @@ OVSDB_CHECK_IDL([simple idl, initially empty, various ops],
                "ia": ["set", [1, 2, 3]],
                "ra": ["set", [-0.5]],
                "ba": ["set", [true]],
-               "sa": ["set", ["abc", "def"]], 
+               "sa": ["set", ["abc", "def"]],
                "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
                               ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
       {"op": "insert",
@@ -77,7 +98,7 @@ OVSDB_CHECK_IDL([simple idl, initially empty, various ops],
                "ia": ["set", [1]],
                "ra": ["set", [1.5]],
                "ba": ["set", [false]],
-               "sa": ["set", []], 
+               "sa": ["set", []],
                "ua": ["set", []]}}]' \
     '["idltest",
       {"op": "update",
@@ -128,7 +149,7 @@ OVSDB_CHECK_IDL([simple idl, initially populated],
                "ia": ["set", [1, 2, 3]],
                "ra": ["set", [-0.5]],
                "ba": ["set", [true]],
-               "sa": ["set", ["abc", "def"]], 
+               "sa": ["set", ["abc", "def"]],
                "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
                               ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
       {"op": "insert",
@@ -159,7 +180,7 @@ OVSDB_CHECK_IDL([simple idl, writing via IDL],
                "ia": ["set", [1, 2, 3]],
                "ra": ["set", [-0.5]],
                "ba": ["set", [true]],
-               "sa": ["set", ["abc", "def"]], 
+               "sa": ["set", ["abc", "def"]],
                "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
                               ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
       {"op": "insert",
index d64d75e..1620f69 100644 (file)
@@ -104,4 +104,3 @@ m4_include([tests/ovsdb-tool.at])
 m4_include([tests/ovsdb-server.at])
 m4_include([tests/ovsdb-monitor.at])
 m4_include([tests/ovsdb-idl.at])
-m4_include([tests/ovsdb-idl-py.at])
index 1774dd9..4b85c71 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import codecs
 import getopt
 import re
 import os
 import signal
 import sys
+import uuid
 
 from ovs.db import error
 import ovs.db.idl
@@ -27,6 +27,7 @@ from ovs.db import types
 import ovs.ovsuuid
 import ovs.poller
 import ovs.util
+import idltest
 
 def unbox_json(json):
     if type(json) == list and len(json) == 1:
@@ -35,14 +36,14 @@ def unbox_json(json):
         return json
 
 def do_default_atoms():
-    for type in types.ATOMIC_TYPES:
-        if type == types.VoidType:
+    for type_ in types.ATOMIC_TYPES:
+        if type_ == types.VoidType:
             continue
 
-        sys.stdout.write("%s: " % type.to_string())
+        sys.stdout.write("%s: " % type_.to_string())
 
-        atom = data.Atom.default(type)
-        if atom != data.Atom.default(type):
+        atom = data.Atom.default(type_)
+        if atom != data.Atom.default(type_):
             sys.stdout.write("wrong\n")
             sys.exit(1)
 
@@ -59,14 +60,14 @@ def do_default_data():
                     valueBase = None
                 else:
                     valueBase = types.BaseType(value)
-                type = types.Type(types.BaseType(key), valueBase, n_min, 1)
-                assert type.is_valid()
+                type_ = types.Type(types.BaseType(key), valueBase, n_min, 1)
+                assert type_.is_valid()
 
                 sys.stdout.write("key %s, value %s, n_min %d: "
                                  % (key.to_string(), value.to_string(), n_min))
 
-                datum = data.Datum.default(type)
-                if datum != data.Datum.default(type):
+                datum = data.Datum.default(type_)
+                if datum != data.Datum.default(type_):
                     sys.stdout.write("wrong\n")
                     any_errors = True
                 else:
@@ -86,8 +87,8 @@ def do_parse_base_type(type_string):
 
 def do_parse_type(type_string):
     type_json = unbox_json(ovs.json.from_string(type_string))
-    type = types.Type.from_json(type_json)
-    print ovs.json.to_string(type.to_json(), sort_keys=True)
+    type_ = types.Type.from_json(type_json)
+    print ovs.json.to_string(type_.to_json(), sort_keys=True)
 
 def do_parse_atoms(type_string, *atom_strings):
     type_json = unbox_json(ovs.json.from_string(type_string))
@@ -102,10 +103,10 @@ def do_parse_atoms(type_string, *atom_strings):
 
 def do_parse_data(type_string, *data_strings):
     type_json = unbox_json(ovs.json.from_string(type_string))
-    type = types.Type.from_json(type_json)
+    type_ = types.Type.from_json(type_json)
     for datum_string in data_strings:
         datum_json = unbox_json(ovs.json.from_string(datum_string))
-        datum = data.Datum.from_json(type, datum_json)
+        datum = data.Datum.from_json(type_, datum_json)
         print ovs.json.to_string(datum.to_json())
 
 def do_sort_atoms(type_string, atom_strings):
@@ -127,26 +128,54 @@ def do_parse_table(name, table_string, default_is_root_string='false'):
     table = ovs.db.schema.TableSchema.from_json(table_json, name)
     print ovs.json.to_string(table.to_json(default_is_root), sort_keys=True)
 
-def do_parse_rows(table_string, *rows):
-    table_json = unbox_json(ovs.json.from_string(table_string))
-    table = ovs.db.schema.TableSchema.from_json(table_json, name)
-
 def do_parse_schema(schema_string):
     schema_json = unbox_json(ovs.json.from_string(schema_string))
     schema = ovs.db.schema.DbSchema.from_json(schema_json)
     print ovs.json.to_string(schema.to_json(), sort_keys=True)
 
 def print_idl(idl, step):
+    simple = idl.tables["simple"].rows
+    l1 = idl.tables["link1"].rows
+    l2 = idl.tables["link2"].rows
+
     n = 0
-    for uuid, row in idl.data["simple"].iteritems():
+    for row in simple.itervalues():
         s = ("%03d: i=%s r=%s b=%s s=%s u=%s "
              "ia=%s ra=%s ba=%s sa=%s ua=%s uuid=%s"
              % (step, row.i, row.r, row.b, row.s, row.u,
-                row.ia, row.ra, row.ba, row.sa, row.ua, uuid))
-        print(re.sub('""|,', "", s))
+                row.ia, row.ra, row.ba, row.sa, row.ua, row.uuid))
+        s = re.sub('""|,|u?\'', "", s)
+        s = re.sub('UUID\(([^)]+)\)', r'\1', s)
+        s = re.sub('False', 'false', s)
+        s = re.sub('True', 'true', s)
+        s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
+        print(s)
         n += 1
+
+    for row in l1.itervalues():
+        s = ["%03d: i=%s k=" % (step, row.i)]
+        if row.k:
+            s.append(str(row.k.i))
+        s.append(" ka=[")
+        s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
+        s.append("] l2=")
+        if row.l2:
+            s.append(str(row.l2[0].i))
+        s.append(" uuid=%s" % row.uuid)
+        print(''.join(s))
+        n += 1
+
+    for row in l2.itervalues():
+        s = ["%03d: i=%s l1=" % (step, row.i)]
+        if row.l1:
+            s.append(str(row.l1.i))
+        s.append(" uuid=%s" % row.uuid)
+        print(''.join(s))
+        n += 1
+
     if not n:
         print("%03d: empty" % step)
+    sys.stdout.flush()
 
 def substitute_uuids(json, symtab):
     if type(json) in [str, unicode]:
@@ -174,8 +203,108 @@ def parse_uuids(json, symtab):
         for value in json.itervalues():
             parse_uuids(value, symtab)
 
-def do_idl(remote, *commands):
-    idl = ovs.db.idl.Idl(remote, "idltest")
+def idltest_find_simple(idl, i):
+    for row in idl.tables["simple"].rows.itervalues():
+        if row.i == i:
+            return row
+    return None
+
+def idl_set(idl, commands, step):
+    txn = ovs.db.idl.Transaction(idl)
+    increment = False
+    for command in commands.split(','):
+        words = command.split()
+        name = words[0]
+        args = words[1:]
+
+        if name == "set":
+            if len(args) != 3:
+                sys.stderr.write('"set" command requires 3 arguments\n')
+                sys.exit(1)
+
+            s = idltest_find_simple(idl, int(args[0]))
+            if not s:
+                sys.stderr.write('"set" command asks for nonexistent i=%d\n'
+                                 % int(args[0]))
+                sys.exit(1)
+
+            if args[1] == "b":
+                s.b = args[2] == "1"
+            elif args[1] == "s":
+                s.s = args[2]
+            elif args[1] == "u":
+                s.u = uuid.UUID(args[2])
+            elif args[1] == "r":
+                s.r = float(args[2])
+            else:
+                sys.stderr.write('"set" comamnd asks for unknown column %s\n'
+                                 % args[2])
+                sys.stderr.exit(1)
+        elif name == "insert":
+            if len(args) != 1:
+                sys.stderr.write('"set" command requires 1 argument\n')
+                sys.exit(1)
+
+            s = txn.insert(idl.tables["simple"])
+            s.i = int(args[0])
+        elif name == "delete":
+            if len(args) != 1:
+                sys.stderr.write('"delete" command requires 1 argument\n')
+                sys.exit(1)
+
+            s = idltest_find_simple(idl, int(args[0]))
+            if not s:
+                sys.stderr.write('"delete" command asks for nonexistent i=%d\n'
+                                 % int(args[0]))
+                sys.exit(1)
+            s.delete()
+        elif name == "verify":
+            if len(args) != 2:
+                sys.stderr.write('"verify" command requires 2 arguments\n')
+                sys.exit(1)
+
+            s = idltest_find_simple(idl, int(args[0]))
+            if not s:
+                sys.stderr.write('"verify" command asks for nonexistent i=%d\n'
+                                 % int(args[0]))
+                sys.exit(1)
+
+            if args[1] in ("i", "b", "s", "u", "r"):
+                s.verify(args[1])
+            else:
+                sys.stderr.write('"verify" command asks for unknown column '
+                                 '"%s"\n' % args[1])
+                sys.exit(1)
+        elif name == "increment":
+            if len(args) != 2:
+                sys.stderr.write('"increment" command requires 2 arguments\n')
+                sys.exit(1)
+
+            txn.increment(args[0], args[1], [])
+            increment = True
+        elif name == "abort":
+            txn.abort()
+            break
+        elif name == "destroy":
+            print "%03d: destroy" % step
+            sys.stdout.flush()
+            txn.abort()
+            return
+        else:
+            sys.stderr.write("unknown command %s\n" % name)
+            sys.exit(1)
+
+    status = txn.commit_block()
+    sys.stdout.write("%03d: commit, status=%s"
+                     % (step, ovs.db.idl.Transaction.status_to_string(status)))
+    if increment and status == ovs.db.idl.Transaction.SUCCESS:
+        sys.stdout.write(", increment=%d" % txn.get_increment_new_value())
+    sys.stdout.write("\n")
+    sys.stdout.flush()
+
+def do_idl(schema_file, remote, *commands):
+    schema = ovs.db.schema.DbSchema.from_json(ovs.json.from_file(schema_file))
+    idl = ovs.db.idl.Idl(remote, schema)
 
     if commands:
         error, stream = ovs.stream.Stream.open_block(
@@ -196,7 +325,7 @@ def do_idl(remote, *commands):
             command = command[1:]
         else:
             # Wait for update.
-            while idl.get_seqno() == seqno and not idl.run():
+            while idl.change_seqno == seqno and not idl.run():
                 rpc.run()
 
                 poller = ovs.poller.Poller()
@@ -207,10 +336,11 @@ def do_idl(remote, *commands):
             print_idl(idl, step)
             step += 1
 
-        seqno = idl.get_seqno()
+        seqno = idl.change_seqno
 
         if command == "reconnect":
             print("%03d: reconnect" % step)
+            sys.stdout.flush()
             step += 1
             idl.force_reconnect()
         elif not command.startswith("["):
@@ -235,10 +365,11 @@ def do_idl(remote, *commands):
                 parse_uuids(reply.result, symtab)
             reply.id = None
             sys.stdout.write("%s\n" % ovs.json.to_string(reply.to_json()))
+            sys.stdout.flush()
 
     if rpc:
         rpc.close()
-    while idl.get_seqno() == seqno and not idl.run():
+    while idl.change_seqno == seqno and not idl.run():
         poller = ovs.poller.Poller()
         idl.wait(poller)
         poller.block()
@@ -277,10 +408,10 @@ parse-table NAME OBJECT [DEFAULT-IS-ROOT]
   parse table NAME with info OBJECT
 parse-schema JSON
   parse JSON as an OVSDB schema, and re-serialize
-idl SERVER [TRANSACTION...]
-  connect to SERVER and dump the contents of the database
-  as seen initially by the IDL implementation and after
-  executing each TRANSACTION.  (Each TRANSACTION must modify
+idl SCHEMA SERVER [TRANSACTION...]
+  connect to SERVER (which has the specified SCHEMA) and dump the
+  contents of the database as seen initially by the IDL implementation
+  and after executing each TRANSACTION.  (Each TRANSACTION must modify
   the database or this command will hang.)
 
 The following options are also available:
@@ -313,8 +444,6 @@ def main(argv):
         else:
             sys.exit(0)
 
-    optKeys = [key for key, value in options]
-
     if not args:
         sys.stderr.write("%s: missing command argument "
                          "(use --help for help)\n" % ovs.util.PROGRAM_NAME)
@@ -331,7 +460,7 @@ def main(argv):
                 "parse-column": (do_parse_column, 2),
                 "parse-table": (do_parse_table, (2, 3)),
                 "parse-schema": (do_parse_schema, 1),
-                "idl": (do_idl, (1,))}
+                "idl": (do_idl, (2,))}
 
     command_name = args[0]
     args = args[1:]
index 57dc2e3..91ada47 100755 (executable)
@@ -32,6 +32,7 @@ import time
 
 import XenAPI
 
+import ovs.dirs
 from ovs.db import error
 from ovs.db import types
 import ovs.util
@@ -116,19 +117,26 @@ def call_vsctl(args):
     if exitcode != 0:
         s_log.warning("Couldn't call ovs-vsctl")
 
-def set_external_id(table, record, key, value):
-    if value:
-        col = 'external-ids:"%s"="%s"' % (key, value)
-        call_vsctl(["set", table, record, col])
+def set_or_delete(d, key, value):
+    if value is None:
+        if key in d:
+            del d[key]
+            return True
     else:
-        call_vsctl(["remove", table, record, "external-ids", key])
+        if d.get(key) != value:
+            d[key] = value
+            return True
+    return False
 
+def set_external_id(row, key, value):
+    external_ids = row.external_ids
+    if set_or_delete(external_ids, key, value):
+        row.external_ids = external_ids
 
 # XenServer does not call interface-reconfigure on internal networks,
 # which is where the fail-mode would normally be set.
-def update_fail_mode(name):
-    rec = get_network_by_bridge(name)
-
+def update_fail_mode(row):
+    rec = get_network_by_bridge(row.name)
     if not rec:
         return
 
@@ -143,80 +151,57 @@ def update_fail_mode(name):
     if fail_mode not in ['standalone', 'secure']:
         fail_mode = 'standalone'
 
-    call_vsctl(["set", "bridge", name, "fail_mode=" + fail_mode])
-
-def update_in_band_mgmt(name):
-    rec = get_network_by_bridge(name)
+    if row.fail_mode != fail_mode:
+        row.fail_mode = fail_mode
 
+def update_in_band_mgmt(row):
+    rec = get_network_by_bridge(row.name)
     if not rec:
         return
 
     dib = rec['other_config'].get('vswitch-disable-in-band')
-    if not dib:
-        call_vsctl(['remove', 'bridge', name, 'other_config',
-                    'disable-in-band'])
-    elif dib in ['true', 'false']:
-        call_vsctl(['set', 'bridge', name,
-                    'other_config:disable-in-band=' + dib])
-    else:
-        s_log.warning('"' + dib + '"'
-                      "isn't a valid setting for other_config:disable-in-band on " +
-                      name)
 
-def update_bridge_id(name, ids):
-    id = get_bridge_id(name, ids.get("xs-network-uuids"))
+    other_config = row.other_config
+    if dib and dib not in ['true', 'false']:
+        s_log.warning('"%s" isn\'t a valid setting for '
+                      "other_config:disable-in-band on %s" % (dib, row.name))
+    elif set_or_delete(other_config, 'disable-in-band', dib):
+        row.other_config = other_config
 
-    if not id:
+def update_bridge_id(row):
+    id_ = get_bridge_id(row.name, row.external_ids.get("xs-network-uuids"))
+    if not id_:
         return
 
-    primary_id = id.split(";")[0]
-
-    if ids.get("bridge-id") != primary_id:
-        set_external_id("Bridge", name, "bridge-id", primary_id)
-        ids["bridge-id"] = primary_id
-
-def update_iface(name, ids):
-    id = get_iface_id(name, ids.get("xs-vif-uuid"))
-    if ids.get("iface-id") != id and id:
-        set_external_id("Interface", name, "iface-id", id)
-        ids["iface-id"] = id
-
-    status = ids.get("iface-status")
-    if status:
-        set_external_id("Interface", name, "iface-status", status)
+    set_external_id(row, "bridge-id", id_.split(";")[0])
 
-def keep_table_columns(schema, table_name, column_types):
+def keep_table_columns(schema, table_name, columns):
     table = schema.tables.get(table_name)
     if not table:
         raise error.Error("schema has no %s table" % table_name)
 
     new_columns = {}
-    for column_name, column_type in column_types.iteritems():
+    for column_name in columns:
         column = table.columns.get(column_name)
         if not column:
             raise error.Error("%s table schema lacks %s column"
                               % (table_name, column_name))
-        if column.type != column_type:
-            raise error.Error("%s column in %s table has type \"%s\", "
-                              "expected type \"%s\""
-                              % (column_name, table_name,
-                                 column.type.toEnglish(),
-                                 column_type.toEnglish()))
         new_columns[column_name] = column
     table.columns = new_columns
     return table
 
-def monitor_uuid_schema_cb(schema):
+def prune_schema(schema):
     string_type = types.Type(types.BaseType(types.StringType))
     string_map_type = types.Type(types.BaseType(types.StringType),
                                  types.BaseType(types.StringType),
                                  0, sys.maxint)
 
     new_tables = {}
-    for table_name in ("Bridge", "Interface"):
-        new_tables[table_name] = keep_table_columns(
-            schema, table_name, {"name": string_type,
-                                 "external_ids": string_map_type})
+    new_tables["Bridge"] = keep_table_columns(
+        schema, "Bridge", ("name", "external_ids", "other_config",
+                           "fail_mode"))
+    new_tables["Interface"] = keep_table_columns(
+        schema, "Interface", ("name", "external_ids"))
     schema.tables = new_tables
 
 def usage():
@@ -227,32 +212,11 @@ def usage():
     print "  -h, --help               display this help message"
     sys.exit(0)
 
-def handler(signum, frame):
+def handler(signum, _):
     global force_run
     if (signum == signal.SIGHUP):
         force_run = True
 
-def update_tap_from_vif(idl, tap_name, vif_name):
-    ifaces = idl.data["Interface"]
-    tap = None
-    vif = None
-
-    for i in ifaces.values():
-        name = i.name.as_scalar().strip('"')
-        if name == tap_name:
-            tap = i
-        elif name == vif_name:
-            vif = i
-
-    if vif and tap:
-        vxid = vif.external_ids
-        txid = tap.external_ids
-
-        keys = ["attached-mac", "xs-network-uuid", "xs-vif-uuid", "xs-vm-uuid"]
-        for k in keys:
-            if vxid.get(k) != txid.get(k):
-                set_external_id("Interface", tap_name, k, vxid.get(k))
-
 def main(argv):
     global force_run
 
@@ -284,7 +248,10 @@ def main(argv):
         sys.exit(1)
 
     remote = args[0]
-    idl = ovs.db.idl.Idl(remote, "Open_vSwitch", monitor_uuid_schema_cb)
+    schema_file = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
+    schema = ovs.db.schema.DbSchema.from_json(ovs.json.from_file(schema_file))
+    prune_schema(schema)
+    idl = ovs.db.idl.Idl(remote, schema)
 
     ovs.daemon.daemonize()
 
@@ -295,8 +262,8 @@ def main(argv):
 
     signal.signal(signal.SIGHUP, handler)
 
-    bridges = {}
-    interfaces = {}
+    bridges = {}                # Map from bridge name to xs_network_uuids
+    interfaces = {}             # Map from interface name to 
     while True:
         if not force_run and not idl.run():
             poller = ovs.poller.Poller()
@@ -310,58 +277,77 @@ def main(argv):
             interfaces = {}
             force_run  = False
 
+        txn = ovs.db.idl.Transaction(idl)
+
         new_bridges = {}
-        for rec in idl.data["Bridge"].itervalues():
-            name = rec.name.as_scalar()
-            xs_network_uuids = rec.external_ids.get("xs-network-uuids")
-            bridge_id = rec.external_ids.get("bridge-id")
-            new_bridges[name] = {"xs-network-uuids": xs_network_uuids,
-                                 "bridge-id": bridge_id}
+        for row in idl.tables["Bridge"].rows.itervalues():
+            old_xnu = bridges.get(row.name)
+            new_xnu = row.external_ids.get("xs-network-uuids", "")
+            if old_xnu is None:
+                # New bridge.
+                update_fail_mode(row)
+                update_in_band_mgmt(row)
+            if new_xnu != old_xnu:
+                # New bridge or bridge's xs-network-uuids has changed.
+                update_bridge_id(row)
+            new_bridges[row.name] = new_xnu
+        bridges = new_bridges
+
+        iface_by_name = {}
+        for row in idl.tables["Interface"].rows.itervalues():
+            iface_by_name[row.name] = row
 
         new_interfaces = {}
-        for rec in idl.data["Interface"].itervalues():
-            name = rec.name.as_scalar()
-            xs_vif_uuid = rec.external_ids.get("xs-vif-uuid")
-            iface_id = rec.external_ids.get("iface-id")
-            new_interfaces[name] = {"xs-vif-uuid": xs_vif_uuid,
-                                    "iface-id": iface_id}
-
-            if name.startswith("vif"):
-                new_interfaces[name]["iface-status"] = "active"
-
-        #Tap devices take their xs-vif-uuid from their corresponding vif and
-        #cause that vif to be labled inactive.
-        for name in new_interfaces:
-            if not name.startswith("tap"):
-                continue
-
-            vif = name.replace("tap", "vif", 1)
-
-            if vif in new_interfaces:
-                xs_vif_uuid = new_interfaces[vif]["xs-vif-uuid"]
-                new_interfaces[name]["xs-vif-uuid"] = xs_vif_uuid
-
-                new_interfaces[vif]["iface-status"] = "inactive"
-                new_interfaces[name]["iface-status"] = "active"
-
-                update_tap_from_vif(idl, name, vif)
-
-        if bridges != new_bridges:
-            for name,ids in new_bridges.items():
-                if name not in bridges:
-                    update_fail_mode(name)
-                    update_in_band_mgmt(name)
-
-                if (name not in bridges) or (bridges[name] != ids):
-                    update_bridge_id(name, ids)
-
-            bridges = new_bridges
-
-        if interfaces != new_interfaces:
-            for name,ids in new_interfaces.items():
-                if (name not in interfaces) or (interfaces[name] != ids):
-                    update_iface(name, ids)
-            interfaces = new_interfaces
+        for row in idl.tables["Interface"].rows.itervalues():
+            # Match up paired vif and tap devices.
+            if row.name.startswith("vif"):
+                vif = row
+                tap = iface_by_name.get("tap%s" % row.name[3:])
+            elif row.name.startswith("tap"):
+                tap = row
+                vif = iface_by_name.get("vif%s" % row.name[3:])
+            else:
+                tap = vif = None
+
+            # Several tap external-ids need to be copied from the vif.
+            if row == tap and vif:
+                keys = ["attached-mac",
+                        "xs-network-uuid",
+                        "xs-vif-uuid",
+                        "xs-vm-uuid"]
+                for k in keys:
+                    set_external_id(row, k, vif.external_ids.get(k))
+
+            # If it's a new interface or its xs-vif-uuid has changed, then
+            # obtain the iface-id from XAPI.
+            #
+            # (A tap's xs-vif-uuid comes from its vif.  That falls out
+            # naturally from the copy loop above.)
+            new_xvu = row.external_ids.get("xs-vif-uuid", "")
+            old_xvu = interfaces.get(row.name)
+            if old_xvu != new_xvu:
+                iface_id = get_iface_id(row.name, new_xvu)
+                if iface_id and row.external_ids.get("iface-id") != iface_id:
+                    set_external_id(row, "iface-id", iface_id)
+
+            # When there's a vif and a tap, the tap is active (used for
+            # traffic).  When there's just a vif, the vif is active.
+            #
+            # A tap on its own shouldn't happen, and we don't know
+            # anything about other kinds of devices, so we don't use
+            # an iface-status for those devices at all.
+            if vif and tap:
+                set_external_id(tap, "iface-status", "active")
+                set_external_id(vif, "iface-status", "inactive")
+            elif vif:
+                set_external_id(vif, "iface-status", "active")
+            else:
+                set_external_id(row, "iface-status", None)
+
+            new_interfaces[row.name] = new_xvu
+        interfaces = new_interfaces
+
+        txn.commit_block()
 
 if __name__ == '__main__':
     try: