--- /dev/null
+# Copyright (c) 2009, 2010 Nicira Networks
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import ovs.jsonrpc
+import ovs.db.schema
+from ovs.db import error
+import ovs.ovsuuid
+
+class Idl:
+ """Open vSwitch Database Interface Definition Language (OVSDB IDL).
+
+ The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
+ requests to an OVSDB database server and parses the responses, converting
+ raw JSON into data structures that are easier for clients to digest.
+
+ The IDL also assists with issuing database transactions. The client
+ creates a transaction, manipulates the IDL data structures, and commits or
+ aborts the transaction. The IDL then composes and issues the necessary
+ JSON-RPC requests and reports to the client whether the transaction
+ completed successfully.
+
+ If 'schema_cb' is provided, it should be a callback function that accepts
+ an ovs.db.schema.DbSchema as its argument. It should determine whether the
+ schema is acceptable and raise an ovs.db.error.Error if it is not. It may
+ also delete any tables or columns from the schema that the client has no
+ interest in monitoring, to save time and bandwidth during monitoring. Its
+ return value is ignored."""
+
+ def __init__(self, remote, db_name, schema_cb=None):
+ """Creates and returns a connection to the database named 'db_name' on
+ 'remote', which should be in a form acceptable to
+ ovs.jsonrpc.session.open(). The connection will maintain an in-memory
+ replica of the remote database."""
+ self.remote = remote
+ self.session = ovs.jsonrpc.Session.open(remote)
+ self.db_name = db_name
+ self.last_seqno = None
+ self.schema = None
+ self.state = None
+ self.change_seqno = 0
+ self.data = {}
+ self.schema_cb = schema_cb
+
+ def close(self):
+ self.session.close()
+
+ def run(self):
+ """Processes a batch of messages from the database server. Returns
+ True if the database as seen through the IDL changed, False if it did
+ not change. The initial fetch of the entire contents of the remote
+ database is considered to be one kind of change.
+
+ This function can return occasional false positives, that is, report
+ that the database changed even though it didn't. This happens if the
+ connection to the database drops and reconnects, which causes the
+ database contents to be reloaded even if they didn't change. (It could
+ also happen if the database server sends out a "change" that reflects
+ what we already thought was in the database, but the database server is
+ not supposed to do that.)
+
+ As an alternative to checking the return value, the client may check
+ for changes in the value returned by self.get_seqno()."""
+ initial_change_seqno = self.change_seqno
+ self.session.run()
+ if self.session.is_connected():
+ seqno = self.session.get_seqno()
+ if seqno != self.last_seqno:
+ self.last_seqno = seqno
+ self.state = (self.__send_schema_request, None)
+ if self.state:
+ self.state[0]()
+ return initial_change_seqno != self.change_seqno
+
+ def wait(self, poller):
+ """Arranges for poller.block() to wake up when self.run() has something
+ to do or when activity occurs on a transaction on 'self'."""
+ self.session.wait(poller)
+ if self.state and self.state[1]:
+ self.state[1](poller)
+
+ def get_seqno(self):
+ """Returns a number that represents the IDL's state. When the IDL
+ updated (by self.run()), the return value changes."""
+ return self.change_seqno
+
+ def __send_schema_request(self):
+ msg = ovs.jsonrpc.Message.create_request("get_schema", [self.db_name])
+ self.session.send(msg)
+ self.state = (lambda: self.__recv_schema(msg.id), self.__recv_wait)
+
+ def __recv_schema(self, id):
+ msg = self.session.recv()
+ if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
+ try:
+ self.schema = ovs.db.schema.DbSchema.from_json(msg.result)
+ except error.Error, e:
+ logging.error("%s: parse error in received schema: %s"
+ % (self.remote, e))
+ self.__error()
+ return
+
+ if self.schema_cb:
+ try:
+ self.schema_cb(self.schema)
+ except error.Error, e:
+ logging.error("%s: error validating schema: %s"
+ % (self.remote, e))
+ self.__error()
+ return
+
+ self.__send_monitor_request()
+ elif msg:
+ logging.error("%s: unexpected message expecting schema: %s"
+ % (self.remote, msg))
+ self.__error()
+
+ def __recv_wait(self, poller):
+ self.session.recv_wait(poller)
+
+ def __send_monitor_request(self):
+ monitor_requests = {}
+ for table in self.schema.tables.itervalues():
+ monitor_requests[table.name] = {"columns": table.columns.keys()}
+ msg = ovs.jsonrpc.Message.create_request(
+ "monitor", [self.db_name, None, monitor_requests])
+ self.session.send(msg)
+ self.state = (lambda: self.__recv_monitor_reply(msg.id),
+ self.__recv_wait)
+
+ def __recv_monitor_reply(self, id):
+ msg = self.session.recv()
+ if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
+ try:
+ self.change_seqno += 1
+ self.state = (self.__recv_update, self.__recv_wait)
+ self.__clear()
+ self.__parse_update(msg.result)
+ except error.Error, e:
+ logging.error("%s: parse error in received schema: %s"
+ % (self.remote, e))
+ self.__error()
+ elif msg:
+ logging.error("%s: unexpected message expecting schema: %s"
+ % (self.remote, msg))
+ self.__error()
+
+ def __recv_update(self):
+ msg = self.session.recv()
+ if (msg and msg.type == ovs.jsonrpc.Message.T_NOTIFY and
+ type(msg.params) == list and len(msg.params) == 2 and
+ msg.params[0] is None):
+ self.__parse_update(msg.params[1])
+ elif msg:
+ logging.error("%s: unexpected message expecting update: %s"
+ % (self.remote, msg))
+ self.__error()
+
+ def __error(self):
+ self.session.force_reconnect()
+
+ def __parse_update(self, update):
+ try:
+ self.__do_parse_update(update)
+ except error.Error, e:
+ logging.error("%s: error parsing update: %s" % (self.remote, e))
+
+ def __do_parse_update(self, table_updates):
+ if type(table_updates) != dict:
+ raise error.Error("<table-updates> is not an object",
+ table_updates)
+
+ for table_name, table_update in table_updates.iteritems():
+ table = self.schema.tables.get(table_name)
+ if not table:
+ raise error.Error("<table-updates> includes unknown "
+ "table \"%s\"" % table_name)
+
+ if type(table_update) != dict:
+ raise error.Error("<table-update> for table \"%s\" is not "
+ "an object" % table_name, table_update)
+
+ for uuid_string, row_update in table_update.iteritems():
+ if not ovs.ovsuuid.UUID.is_valid_string(uuid_string):
+ raise error.Error("<table-update> for table \"%s\" "
+ "contains bad UUID \"%s\" as member "
+ "name" % (table_name, uuid_string),
+ table_update)
+ uuid = ovs.ovsuuid.UUID.from_string(uuid_string)
+
+ if type(row_update) != dict:
+ raise error.Error("<table-update> for table \"%s\" "
+ "contains <row-update> for %s that "
+ "is not an object"
+ % (table_name, uuid_string))
+
+ old = row_update.get("old", None)
+ new = row_update.get("new", None)
+
+ if old is not None and type(old) != dict:
+ raise error.Error("\"old\" <row> is not object", old)
+ if new is not None and type(new) != dict:
+ raise error.Error("\"new\" <row> is not object", new)
+ if (old is not None) + (new is not None) != len(row_update):
+ raise error.Error("<row-update> contains unexpected "
+ "member", row_update)
+ if not old and not new:
+ raise error.Error("<row-update> missing \"old\" and "
+ "\"new\" members", row_update)
+
+ if self.__parse_row_update(table, uuid, old, new):
+ self.change_seqno += 1
+
+ def __parse_row_update(self, table, uuid, old, new):
+ """Returns True if a column changed, False otherwise."""
+ row = self.data[table.name].get(uuid)
+ if not new:
+ # Delete row.
+ if row:
+ del self.data[table.name][uuid]
+ else:
+ # XXX rate-limit
+ logging.warning("cannot delete missing row %s from table %s"
+ % (uuid, table.name))
+ return False
+ elif not old:
+ # Insert row.
+ if not row:
+ row = self.__create_row(table, uuid)
+ else:
+ # XXX rate-limit
+ logging.warning("cannot add existing row %s to table %s"
+ % (uuid, table.name))
+ self.__modify_row(table, row, new)
+ else:
+ if not row:
+ row = self.__create_row(table, uuid)
+ # XXX rate-limit
+ logging.warning("cannot modify missing row %s in table %s"
+ % (uuid, table_name))
+ self.__modify_row(table, row, new)
+ return True
+
+ def __modify_row(self, table, row, row_json):
+ changed = False
+ for column_name, datum_json in row_json.iteritems():
+ column = table.columns.get(column_name)
+ if not column:
+ # XXX rate-limit
+ logging.warning("unknown column %s updating table %s"
+ % (column_name, table.name))
+ continue
+
+ try:
+ datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+ except error.Error, e:
+ # XXX rate-limit
+ logging.warning("error parsing column %s in table %s: %s"
+ % (column_name, table_name, e))
+ continue
+
+ if datum != row.__dict__[column_name]:
+ row.__dict__[column_name] = datum
+ changed = True
+ else:
+ # Didn't really change but the OVSDB monitor protocol always
+ # includes every value in a row.
+ pass
+ return changed
+
+ def __clear(self):
+ if self.data != {}:
+ for table_name in self.schema.tables:
+ if self.data[table_name] != {}:
+ self.change_seqno += 1
+ break
+
+ self.data = {}
+ for table_name in self.schema.tables:
+ self.data[table_name] = {}
+
+ def __create_row(self, table, uuid):
+ class Row(object):
+ pass
+ row = self.data[table.name][uuid] = Row()
+ for column in table.columns.itervalues():
+ row.__dict__[column.name] = ovs.db.data.Datum.default(column.type)
+ return row
+
+ def force_reconnect(self):
+ """Forces the IDL to drop its connection to the database and reconnect.
+ In the meantime, the contents of the IDL will not change."""
+ self.session.force_reconnect()