idl: Convert python daemons to utilize SchemaHelper.
[sliver-openvswitch.git] / python / ovs / db / idl.py
index d01fde8..5639120 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2009, 2010, 2011 Nicira Networks
+# Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
 import uuid
 
 import ovs.jsonrpc
@@ -21,6 +20,12 @@ import ovs.db.schema
 from ovs.db import error
 import ovs.ovsuuid
 import ovs.poller
+import ovs.vlog
+
+vlog = ovs.vlog.Vlog("idl")
+
+__pychecker__ = 'no-classattr no-objattrs'
+
 
 class Idl:
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -83,8 +88,14 @@ class Idl:
         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
         useful for columns that the IDL's client will write but not read.
 
+        As a convenience to users, 'schema' may also be an instance of the
+        SchemaHelper class.
+
         The IDL uses and modifies 'schema' directly."""
 
+        assert isinstance(schema, SchemaHelper)
+        schema = schema.get_idl_schema()
+
         self.tables = schema.tables
         self._db = schema
         self._session = ovs.jsonrpc.Session.open(remote)
@@ -95,7 +106,7 @@ class Idl:
         # Database locking.
         self.lock_name = None          # Name of lock we need, None if none.
         self.has_lock = False          # Has db server said we have the lock?
-        self.is_lock_contended = False # Has db server said we can't get lock?
+        self.is_lock_contended = False  # Has db server said we can't get lock?
         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
 
         # Transaction support.
@@ -170,8 +181,8 @@ class Idl:
                     self.__clear()
                     self.__parse_update(msg.result)
                 except error.Error, e:
-                    logging.error("%s: parse error in received schema: %s"
-                                  % (self._session.get_name(), e))
+                    vlog.err("%s: parse error in received schema: %s"
+                              % (self._session.get_name(), e))
                     self.__error()
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._lock_request_id is not None
@@ -197,9 +208,9 @@ class Idl:
             else:
                 # This can happen if a transaction is destroyed before we
                 # receive the reply, so keep the log level low.
-                logging.debug("%s: received unexpected %s message"
-                              % (self._session.get_name(),
-                                 ovs.jsonrpc.Message.type_to_string(msg.type)))
+                vlog.dbg("%s: received unexpected %s message"
+                         % (self._session.get_name(),
+                             ovs.jsonrpc.Message.type_to_string(msg.type)))
 
         return initial_change_seqno != self.change_seqno
 
@@ -316,8 +327,8 @@ class Idl:
         try:
             self.__do_parse_update(update)
         except error.Error, e:
-            logging.error("%s: error parsing update: %s"
-                          % (self._session.get_name(), e))
+            vlog.err("%s: error parsing update: %s"
+                     % (self._session.get_name(), e))
 
     def __do_parse_update(self, table_updates):
         if type(table_updates) != dict:
@@ -371,8 +382,8 @@ class Idl:
                 changed = True
             else:
                 # XXX rate-limit
-                logging.warning("cannot delete missing row %s from table %s"
-                                % (uuid, table.name))
+                vlog.warn("cannot delete missing row %s from table %s"
+                          % (uuid, table.name))
         elif not old:
             # Insert row.
             if not row:
@@ -380,8 +391,8 @@ class Idl:
                 changed = True
             else:
                 # XXX rate-limit
-                logging.warning("cannot add existing row %s to table %s"
-                                % (uuid, table.name))
+                vlog.warn("cannot add existing row %s to table %s"
+                          % (uuid, table.name))
             if self.__row_update(table, row, new):
                 changed = True
         else:
@@ -389,8 +400,8 @@ class Idl:
                 row = self.__create_row(table, uuid)
                 changed = True
                 # XXX rate-limit
-                logging.warning("cannot modify missing row %s in table %s"
-                                % (uuid, table.name))
+                vlog.warn("cannot modify missing row %s in table %s"
+                          % (uuid, table.name))
             if self.__row_update(table, row, new):
                 changed = True
         return changed
@@ -401,16 +412,16 @@ class Idl:
             column = table.columns.get(column_name)
             if not column:
                 # XXX rate-limit
-                logging.warning("unknown column %s updating table %s"
-                                % (column_name, table.name))
+                vlog.warn("unknown column %s updating table %s"
+                          % (column_name, table.name))
                 continue
 
             try:
                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
             except error.Error, e:
                 # XXX rate-limit
-                logging.warning("error parsing column %s in table %s: %s"
-                                % (column_name, table.name, e))
+                vlog.warn("error parsing column %s in table %s: %s"
+                          % (column_name, table.name, e))
                 continue
 
             if datum != row._data[column_name]:
@@ -436,25 +447,28 @@ class Idl:
     def __txn_abort_all(self):
         while self._outstanding_txns:
             txn = self._outstanding_txns.popitem()[1]
-            txn._status = Transaction.TRY_AGAIN
+            txn._status = Transaction.AGAIN_WAIT
 
     def __txn_process_reply(self, msg):
         txn = self._outstanding_txns.pop(msg.id, None)
         if txn:
             txn._process_reply(msg)
 
+
 def _uuid_to_row(atom, base):
     if base.ref_table:
         return base.ref_table.rows.get(atom)
     else:
         return atom
 
+
 def _row_to_uuid(value):
     if type(value) == Row:
         return value.uuid
     else:
         return value
 
+
 class Row(object):
     """A row within an IDL.
 
@@ -542,8 +556,8 @@ class Row(object):
                                                   _row_to_uuid)
         except error.Error, e:
             # XXX rate-limit
-            logging.error("attempting to write bad value to column %s (%s)"
-                          % (column_name, e))
+            vlog.err("attempting to write bad value to column %s (%s)"
+                     % (column_name, e))
             return
         self._idl.txn._write(self, column, datum)
 
@@ -553,7 +567,9 @@ class Row(object):
         if 'column_name' changed in this row (or if this row was deleted)
         between the time that the IDL originally read its contents and the time
         that the transaction commits, then the transaction aborts and
-        Transaction.commit() returns Transaction.TRY_AGAIN.
+        Transaction.commit() returns Transaction.AGAIN_WAIT or
+        Transaction.AGAIN_NOW (depending on whether the database change has
+        already been received).
 
         The intention is that, to ensure that no transaction commits based on
         dirty reads, an application should call Row.verify() on each data item
@@ -590,29 +606,36 @@ class Row(object):
         self.__dict__["_changes"] = None
         del self._table.rows[self.uuid]
 
+
 def _uuid_name_from_uuid(uuid):
     return "row%s" % str(uuid).replace("-", "_")
 
+
 def _where_uuid_equals(uuid):
     return [["_uuid", "==", ["uuid", str(uuid)]]]
 
+
 class _InsertedRow(object):
     def __init__(self, op_index):
         self.op_index = op_index
         self.real = None
 
+
 class Transaction(object):
     # Status values that Transaction.commit() can return.
-    UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
-    UNCHANGED = "unchanged"     # Transaction didn't include any changes.
-    INCOMPLETE = "incomplete"   # Commit in progress, please wait.
-    ABORTED = "aborted"         # ovsdb_idl_txn_abort() called.
-    SUCCESS = "success"         # Commit successful.
-    TRY_AGAIN = "try again"     # Commit failed because a "verify" operation
-                                # reported an inconsistency, due to a network
-                                # problem, or other transient failure.
-    NOT_LOCKED = "not locked"   # Server hasn't given us the lock yet.
-    ERROR = "error"             # Commit failed due to a hard error.
+    UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
+    UNCHANGED = "unchanged"      # Transaction didn't include any changes.
+    INCOMPLETE = "incomplete"    # Commit in progress, please wait.
+    ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
+    SUCCESS = "success"          # Commit successful.
+    AGAIN_WAIT = "wait then try again"
+                                 # Commit failed because a "verify" operation
+                                 # reported an inconsistency, due to a network
+                                 # problem, or other transient failure.  Wait
+                                 # for a change, then try again.
+    AGAIN_NOW = "try again now"  # Same as AGAIN_WAIT but try again right away.
+    NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
+    ERROR = "error"              # Commit failed due to a hard error.
 
     @staticmethod
     def status_to_string(status):
@@ -645,12 +668,13 @@ class Transaction(object):
         self._status = Transaction.UNCOMMITTED
         self._error = None
         self._comments = []
+        self._commit_seqno = self.idl.change_seqno
 
         self._inc_table = None
         self._inc_column = None
         self._inc_where = None
 
-        self._inserted_rows = {} # Map from UUID to _InsertedRow
+        self._inserted_rows = {}  # Map from UUID to _InsertedRow
 
     def add_comment(self, comment):
         """Appens 'comment' to the comments that will be passed to the OVSDB
@@ -768,7 +792,8 @@ class Transaction(object):
 
                 for column_name, datum in row._changes.iteritems():
                     if row._data is not None or not datum.is_default():
-                        row_json[column_name] = self._substitute_uuids(datum.to_json())
+                        row_json[column_name] = (
+                                self._substitute_uuids(datum.to_json()))
 
                         # If anything really changed, consider it an update.
                         # We can't suppress not-really-changed values earlier
@@ -787,11 +812,13 @@ class Transaction(object):
 
             operations.append({"op": "mutate",
                                "table": self._inc_table,
-                               "where": self._substitute_uuids(self._inc_where),
+                               "where": self._substitute_uuids(
+                                   self._inc_where),
                                "mutations": [[self._inc_column, "+=", 1]]})
             operations.append({"op": "select",
                                "table": self._inc_table,
-                               "where": self._substitute_uuids(self._inc_where),
+                               "where": self._substitute_uuids(
+                                   self._inc_where),
                                "columns": [self._inc_column]})
 
         # Add comment.
@@ -812,7 +839,7 @@ class Transaction(object):
                 self.idl._outstanding_txns[self._request_id] = self
                 self._status = Transaction.INCOMPLETE
             else:
-                self._status = Transaction.TRY_AGAIN
+                self._status = Transaction.AGAIN_WAIT
 
         self.__disassemble()
         return self._status
@@ -923,7 +950,7 @@ class Transaction(object):
             self._status = Transaction.ERROR
         elif type(msg.result) not in (list, tuple):
             # XXX rate-limit
-            logging.warning('reply to "transact" is not JSON array')
+            vlog.warn('reply to "transact" is not JSON array')
         else:
             hard_errors = False
             soft_errors = False
@@ -952,8 +979,7 @@ class Transaction(object):
                     hard_errors = True
                     self.__set_error_json(op)
                     # XXX rate-limit
-                    logging.warning("operation reply is not JSON null or "
-                                    "object")
+                    vlog.warn("operation reply is not JSON null or object")
 
             if not soft_errors and not hard_errors and not lock_errors:
                 if self._inc_table and not self.__process_inc_reply(ops):
@@ -968,7 +994,10 @@ class Transaction(object):
             elif lock_errors:
                 self._status = Transaction.NOT_LOCKED
             elif soft_errors:
-                self._status = Transaction.TRY_AGAIN
+                if self._commit_seqno == self.idl.change_seqno:
+                    self._status = Transaction.AGAIN_WAIT
+                else:
+                    self._status = Transaction.AGAIN_NOW
             else:
                 self._status = Transaction.SUCCESS
 
@@ -976,11 +1005,11 @@ class Transaction(object):
     def __check_json_type(json, types, name):
         if not json:
             # XXX rate-limit
-            logging.warning("%s is missing" % name)
+            vlog.warn("%s is missing" % name)
             return False
         elif type(json) not in types:
             # XXX rate-limit
-            logging.warning("%s has unexpected type %s" % (name, type(json)))
+            vlog.warn("%s has unexpected type %s" % (name, type(json)))
             return False
         else:
             return True
@@ -988,9 +1017,9 @@ class Transaction(object):
     def __process_inc_reply(self, ops):
         if self._inc_index + 2 > len(ops):
             # XXX rate-limit
-            logging.warning("reply does not contain enough operations for "
-                            "increment (has %d, needs %d)" %
-                            (len(ops), self._inc_index + 2))
+            vlog.warn("reply does not contain enough operations for "
+                      "increment (has %d, needs %d)" %
+                      (len(ops), self._inc_index + 2))
 
         # We know that this is a JSON object because the loop in
         # __process_reply() already checked.
@@ -1001,8 +1030,7 @@ class Transaction(object):
             return False
         if count != 1:
             # XXX rate-limit
-            logging.warning('"mutate" reply "count" is %d instead of 1'
-                            % count)
+            vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
             return False
 
         select = ops[self._inc_index + 1]
@@ -1012,8 +1040,8 @@ class Transaction(object):
             return False
         if len(rows) != 1:
             # XXX rate-limit
-            logging.warning('"select" reply "rows" has %d elements '
-                            'instead of 1' % len(rows))
+            vlog.warn('"select" reply "rows" has %d elements '
+                      'instead of 1' % len(rows))
             return False
         row = rows[0]
         if not Transaction.__check_json_type(row, (dict,),
@@ -1029,9 +1057,9 @@ class Transaction(object):
     def __process_insert_reply(self, insert, ops):
         if insert.op_index >= len(ops):
             # XXX rate-limit
-            logging.warning("reply does not contain enough operations "
-                            "for insert (has %d, needs %d)"
-                            % (len(ops), insert.op_index))
+            vlog.warn("reply does not contain enough operations "
+                      "for insert (has %d, needs %d)"
+                      % (len(ops), insert.op_index))
             return False
 
         # We know that this is a JSON object because the loop in
@@ -1046,8 +1074,81 @@ class Transaction(object):
             uuid_ = ovs.ovsuuid.from_json(json_uuid)
         except error.Error:
             # XXX rate-limit
-            logging.warning('"insert" reply "uuid" is not a JSON UUID')
+            vlog.warn('"insert" reply "uuid" is not a JSON UUID')
             return False
 
         insert.real = uuid_
         return True
+
+
+class SchemaHelper(object):
+    """IDL Schema helper.
+
+    This class encapsulates the logic required to generate schemas suitable
+    for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
+    they are interested in using register_columns().  When finished, the
+    get_idl_schema() function may be called.
+
+    The location on disk of the schema used may be found in the
+    'schema_location' variable."""
+
+    def __init__(self, location=None):
+        """Creates a new Schema object."""
+
+        if location is None:
+            location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
+
+        self.schema_location = location
+        self._tables = {}
+        self._all = False
+
+    def register_columns(self, table, columns):
+        """Registers interest in the given 'columns' of 'table'.  Future calls
+        to get_idl_schema() will include 'table':column for each column in
+        'columns'. This function automatically avoids adding duplicate entries
+        to the schema.
+
+        'table' must be a string.
+        'columns' must be a list of strings.
+        """
+
+        assert type(table) is str
+        assert type(columns) is list
+
+        columns = set(columns) | self._tables.get(table, set())
+        self._tables[table] = columns
+
+    def register_all(self):
+        """Registers interest in every column of every table."""
+        self._all = True
+
+    def get_idl_schema(self):
+        """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
+        object based on columns registered using the register_columns()
+        function."""
+
+        schema = ovs.db.schema.DbSchema.from_json(
+            ovs.json.from_file(self.schema_location))
+
+        if not self._all:
+            schema_tables = {}
+            for table, columns in self._tables.iteritems():
+                schema_tables[table] = (
+                    self._keep_table_columns(schema, table, columns))
+
+            schema.tables = schema_tables
+        return schema
+
+    def _keep_table_columns(self, schema, table_name, columns):
+        assert table_name in schema.tables
+        table = schema.tables[table_name]
+
+        new_columns = {}
+        for column_name in columns:
+            assert type(column_name) is str
+            assert column_name in table.columns
+
+            new_columns[column_name] = table.columns[column_name]
+
+        table.columns = new_columns
+        return table