1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
25 vlog = ovs.vlog.Vlog("idl")
27 __pychecker__ = 'no-classattr no-objattrs'
31 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
33 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
34 requests to an OVSDB database server and parses the responses, converting
35 raw JSON into data structures that are easier for clients to digest.
37 The IDL also assists with issuing database transactions. The client
38 creates a transaction, manipulates the IDL data structures, and commits or
39 aborts the transaction. The IDL then composes and issues the necessary
40 JSON-RPC requests and reports to the client whether the transaction
41 completed successfully.
43 The client is allowed to access the following attributes directly, in a
46 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
48 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
51 The client may directly read and write the Row objects referenced by the
52 'rows' map values. Refer to Row for more details.
54 - 'change_seqno': A number that represents the IDL's state. When the IDL
55 is updated (by Idl.run()), its value changes. The sequence number can
56 occasionally change even if the database does not. This happens if the
57 connection to the database drops and reconnects, which causes the
58 database contents to be reloaded even if they didn't change. (It could
59 also happen if the database server sends out a "change" that reflects
60 what the IDL already thought was in the database. The database server is
61 not supposed to do that, but bugs could in theory cause it to do so.)
63 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
64 if no lock is configured.
66 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
67 lock, and False otherwise.
69 Locking and unlocking happens asynchronously from the database client's
70 point of view, so the information is only useful for optimization
71 (e.g. if the client doesn't have the lock then there's no point in trying
72 to write to the database).
74 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
75 the database server has indicated that some other client already owns the
76 requested lock, and False otherwise.
78 - 'txn': The ovs.db.idl.Transaction object for the database transaction
79 currently being constructed, if there is one, or None otherwise.
82 def __init__(self, remote, schema):
83 """Creates and returns a connection to the database named 'db_name' on
84 'remote', which should be in a form acceptable to
85 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
86 replica of the remote database.
88 'schema' should be the schema for the remote database. The caller may
89 have cut it down by removing tables or columns that are not of
90 interest. The IDL will only replicate the tables and columns that
91 remain. The caller may also add a attribute named 'alert' to selected
92 remaining columns, setting its value to False; if so, then changes to
93 those columns will not be considered changes to the database for the
94 purpose of the return value of Idl.run() and Idl.change_seqno. This is
95 useful for columns that the IDL's client will write but not read.
97 As a convenience to users, 'schema' may also be an instance of the
100 The IDL uses and modifies 'schema' directly."""
102 assert isinstance(schema, SchemaHelper)
103 schema = schema.get_idl_schema()
105 self.tables = schema.tables
107 self._session = ovs.jsonrpc.Session.open(remote)
108 self._monitor_request_id = None
109 self._last_seqno = None
110 self.change_seqno = 0
113 self.lock_name = None # Name of lock we need, None if none.
114 self.has_lock = False # Has db server said we have the lock?
115 self.is_lock_contended = False # Has db server said we can't get lock?
116 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
118 # Transaction support.
120 self._outstanding_txns = {}
122 for table in schema.tables.itervalues():
123 for column in table.columns.itervalues():
124 if not hasattr(column, 'alert'):
126 table.need_table = False
131 """Closes the connection to the database. The IDL will no longer
133 self._session.close()
136 """Processes a batch of messages from the database server. Returns
137 True if the database as seen through the IDL changed, False if it did
138 not change. The initial fetch of the entire contents of the remote
139 database is considered to be one kind of change. If the IDL has been
140 configured to acquire a database lock (with Idl.set_lock()), then
141 successfully acquiring the lock is also considered to be a change.
143 This function can return occasional false positives, that is, report
144 that the database changed even though it didn't. This happens if the
145 connection to the database drops and reconnects, which causes the
146 database contents to be reloaded even if they didn't change. (It could
147 also happen if the database server sends out a "change" that reflects
148 what we already thought was in the database, but the database server is
149 not supposed to do that.)
151 As an alternative to checking the return value, the client may check
152 for changes in self.change_seqno."""
154 initial_change_seqno = self.change_seqno
159 if not self._session.is_connected():
162 seqno = self._session.get_seqno()
163 if seqno != self._last_seqno:
164 self._last_seqno = seqno
165 self.__txn_abort_all()
166 self.__send_monitor_request()
168 self.__send_lock_request()
171 msg = self._session.recv()
174 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
175 and msg.method == "update"
176 and len(msg.params) == 2
177 and msg.params[0] == None):
178 # Database contents changed.
179 self.__parse_update(msg.params[1])
180 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
181 and self._monitor_request_id is not None
182 and self._monitor_request_id == msg.id):
183 # Reply to our "monitor" request.
185 self.change_seqno += 1
186 self._monitor_request_id = None
188 self.__parse_update(msg.result)
189 except error.Error, e:
190 vlog.err("%s: parse error in received schema: %s"
191 % (self._session.get_name(), e))
193 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
194 and self._lock_request_id is not None
195 and self._lock_request_id == msg.id):
196 # Reply to our "lock" request.
197 self.__parse_lock_reply(msg.result)
198 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
199 and msg.method == "locked"):
201 self.__parse_lock_notify(msg.params, True)
202 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
203 and msg.method == "stolen"):
204 # Someone else stole our lock.
205 self.__parse_lock_notify(msg.params, False)
206 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
207 # Reply to our echo request. Ignore it.
209 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
210 ovs.jsonrpc.Message.T_REPLY)
211 and self.__txn_process_reply(msg)):
212 # __txn_process_reply() did everything needed.
215 # This can happen if a transaction is destroyed before we
216 # receive the reply, so keep the log level low.
217 vlog.dbg("%s: received unexpected %s message"
218 % (self._session.get_name(),
219 ovs.jsonrpc.Message.type_to_string(msg.type)))
221 return initial_change_seqno != self.change_seqno
223 def wait(self, poller):
224 """Arranges for poller.block() to wake up when self.run() has something
225 to do or when activity occurs on a transaction on 'self'."""
226 self._session.wait(poller)
227 self._session.recv_wait(poller)
229 def has_ever_connected(self):
230 """Returns True, if the IDL successfully connected to the remote
231 database and retrieved its contents (even if the connection
232 subsequently dropped and is in the process of reconnecting). If so,
233 then the IDL contains an atomic snapshot of the database's contents
234 (but it might be arbitrarily old if the connection dropped).
236 Returns False if the IDL has never connected or retrieved the
237 database's contents. If so, the IDL is empty."""
238 return self.change_seqno != 0
240 def force_reconnect(self):
241 """Forces the IDL to drop its connection to the database and reconnect.
242 In the meantime, the contents of the IDL will not change."""
243 self._session.force_reconnect()
245 def set_lock(self, lock_name):
246 """If 'lock_name' is not None, configures the IDL to obtain the named
247 lock from the database server and to avoid modifying the database when
248 the lock cannot be acquired (that is, when another client has the same
251 If 'lock_name' is None, drops the locking requirement and releases the
254 assert not self._outstanding_txns
256 if self.lock_name and (not lock_name or lock_name != self.lock_name):
257 # Release previous lock.
258 self.__send_unlock_request()
259 self.lock_name = None
260 self.is_lock_contended = False
262 if lock_name and not self.lock_name:
264 self.lock_name = lock_name
265 self.__send_lock_request()
270 for table in self.tables.itervalues():
276 self.change_seqno += 1
278 def __update_has_lock(self, new_has_lock):
279 if new_has_lock and not self.has_lock:
280 if self._monitor_request_id is None:
281 self.change_seqno += 1
283 # We're waiting for a monitor reply, so don't signal that the
284 # database changed. The monitor reply will increment
285 # change_seqno anyhow.
287 self.is_lock_contended = False
288 self.has_lock = new_has_lock
290 def __do_send_lock_request(self, method):
291 self.__update_has_lock(False)
292 self._lock_request_id = None
293 if self._session.is_connected():
294 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
296 self._session.send(msg)
301 def __send_lock_request(self):
302 self._lock_request_id = self.__do_send_lock_request("lock")
304 def __send_unlock_request(self):
305 self.__do_send_lock_request("unlock")
307 def __parse_lock_reply(self, result):
308 self._lock_request_id = None
309 got_lock = type(result) == dict and result.get("locked") is True
310 self.__update_has_lock(got_lock)
312 self.is_lock_contended = True
314 def __parse_lock_notify(self, params, new_has_lock):
315 if (self.lock_name is not None
316 and type(params) in (list, tuple)
318 and params[0] == self.lock_name):
319 self.__update_has_lock(self, new_has_lock)
321 self.is_lock_contended = True
323 def __send_monitor_request(self):
324 monitor_requests = {}
325 for table in self.tables.itervalues():
326 monitor_requests[table.name] = {"columns": table.columns.keys()}
327 msg = ovs.jsonrpc.Message.create_request(
328 "monitor", [self._db.name, None, monitor_requests])
329 self._monitor_request_id = msg.id
330 self._session.send(msg)
332 def __parse_update(self, update):
334 self.__do_parse_update(update)
335 except error.Error, e:
336 vlog.err("%s: error parsing update: %s"
337 % (self._session.get_name(), e))
339 def __do_parse_update(self, table_updates):
340 if type(table_updates) != dict:
341 raise error.Error("<table-updates> is not an object",
344 for table_name, table_update in table_updates.iteritems():
345 table = self.tables.get(table_name)
347 raise error.Error('<table-updates> includes unknown '
348 'table "%s"' % table_name)
350 if type(table_update) != dict:
351 raise error.Error('<table-update> for table "%s" is not '
352 'an object' % table_name, table_update)
354 for uuid_string, row_update in table_update.iteritems():
355 if not ovs.ovsuuid.is_valid_string(uuid_string):
356 raise error.Error('<table-update> for table "%s" '
357 'contains bad UUID "%s" as member '
358 'name' % (table_name, uuid_string),
360 uuid = ovs.ovsuuid.from_string(uuid_string)
362 if type(row_update) != dict:
363 raise error.Error('<table-update> for table "%s" '
364 'contains <row-update> for %s that '
366 % (table_name, uuid_string))
368 parser = ovs.db.parser.Parser(row_update, "row-update")
369 old = parser.get_optional("old", [dict])
370 new = parser.get_optional("new", [dict])
373 if not old and not new:
374 raise error.Error('<row-update> missing "old" and '
375 '"new" members', row_update)
377 if self.__process_update(table, uuid, old, new):
378 self.change_seqno += 1
380 def __process_update(self, table, uuid, old, new):
381 """Returns True if a column changed, False otherwise."""
382 row = table.rows.get(uuid)
391 vlog.warn("cannot delete missing row %s from table %s"
392 % (uuid, table.name))
396 row = self.__create_row(table, uuid)
400 vlog.warn("cannot add existing row %s to table %s"
401 % (uuid, table.name))
402 if self.__row_update(table, row, new):
406 row = self.__create_row(table, uuid)
409 vlog.warn("cannot modify missing row %s in table %s"
410 % (uuid, table.name))
411 if self.__row_update(table, row, new):
415 def __row_update(self, table, row, row_json):
417 for column_name, datum_json in row_json.iteritems():
418 column = table.columns.get(column_name)
421 vlog.warn("unknown column %s updating table %s"
422 % (column_name, table.name))
426 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
427 except error.Error, e:
429 vlog.warn("error parsing column %s in table %s: %s"
430 % (column_name, table.name, e))
433 if datum != row._data[column_name]:
434 row._data[column_name] = datum
438 # Didn't really change but the OVSDB monitor protocol always
439 # includes every value in a row.
443 def __create_row(self, table, uuid):
445 for column in table.columns.itervalues():
446 data[column.name] = ovs.db.data.Datum.default(column.type)
447 row = table.rows[uuid] = Row(self, table, uuid, data)
451 self._session.force_reconnect()
453 def __txn_abort_all(self):
454 while self._outstanding_txns:
455 txn = self._outstanding_txns.popitem()[1]
456 txn._status = Transaction.TRY_AGAIN
458 def __txn_process_reply(self, msg):
459 txn = self._outstanding_txns.pop(msg.id, None)
461 txn._process_reply(msg)
464 def _uuid_to_row(atom, base):
466 return base.ref_table.rows.get(atom)
471 def _row_to_uuid(value):
472 if type(value) == Row:
479 """A row within an IDL.
481 The client may access the following attributes directly:
483 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
485 - An attribute for each column in the Row's table, named for the column,
486 whose values are as returned by Datum.to_python() for the column's type.
488 If some error occurs (e.g. the database server's idea of the column is
489 different from the IDL's idea), then the attribute values is the
490 "default" value return by Datum.default() for the column's type. (It is
491 important to know this because the default value may violate constraints
492 for the column's type, e.g. the default integer value is 0 even if column
493 contraints require the column's value to be positive.)
495 When a transaction is active, column attributes may also be assigned new
496 values. Committing the transaction will then cause the new value to be
497 stored into the database.
499 *NOTE*: In the current implementation, the value of a column is a *copy*
500 of the value in the database. This means that modifying its value
501 directly will have no useful effect. For example, the following:
502 row.mycolumn["a"] = "b" # don't do this
503 will not change anything in the database, even after commit. To modify
504 the column, instead assign the modified column value back to the column:
509 def __init__(self, idl, table, uuid, data):
510 # All of the explicit references to self.__dict__ below are required
511 # to set real attributes with invoking self.__getattr__().
512 self.__dict__["uuid"] = uuid
514 self.__dict__["_idl"] = idl
515 self.__dict__["_table"] = table
517 # _data is the committed data. It takes the following values:
519 # - A dictionary that maps every column name to a Datum, if the row
520 # exists in the committed form of the database.
522 # - None, if this row is newly inserted within the active transaction
523 # and thus has no committed form.
524 self.__dict__["_data"] = data
526 # _changes describes changes to this row within the active transaction.
527 # It takes the following values:
529 # - {}, the empty dictionary, if no transaction is active or if the
530 # row has yet not been changed within this transaction.
532 # - A dictionary that maps a column name to its new Datum, if an
533 # active transaction changes those columns' values.
535 # - A dictionary that maps every column name to a Datum, if the row
536 # is newly inserted within the active transaction.
538 # - None, if this transaction deletes this row.
539 self.__dict__["_changes"] = {}
541 # A dictionary whose keys are the names of columns that must be
542 # verified as prerequisites when the transaction commits. The values
543 # in the dictionary are all None.
544 self.__dict__["_prereqs"] = {}
546 def __getattr__(self, column_name):
547 assert self._changes is not None
549 datum = self._changes.get(column_name)
551 if self._data is None:
552 raise AttributeError("%s instance has no attribute '%s'" %
553 (self.__class__.__name__, column_name))
554 datum = self._data[column_name]
556 return datum.to_python(_uuid_to_row)
558 def __setattr__(self, column_name, value):
559 assert self._changes is not None
562 column = self._table.columns[column_name]
564 datum = ovs.db.data.Datum.from_python(column.type, value,
566 except error.Error, e:
568 vlog.err("attempting to write bad value to column %s (%s)"
571 self._idl.txn._write(self, column, datum)
573 def verify(self, column_name):
574 """Causes the original contents of column 'column_name' in this row to
575 be verified as a prerequisite to completing the transaction. That is,
576 if 'column_name' changed in this row (or if this row was deleted)
577 between the time that the IDL originally read its contents and the time
578 that the transaction commits, then the transaction aborts and
579 Transaction.commit() returns Transaction.TRY_AGAIN.
581 The intention is that, to ensure that no transaction commits based on
582 dirty reads, an application should call Row.verify() on each data item
583 read as part of a read-modify-write operation.
585 In some cases Row.verify() reduces to a no-op, because the current
586 value of the column is already known:
588 - If this row is a row created by the current transaction (returned
589 by Transaction.insert()).
591 - If the column has already been modified within the current
594 Because of the latter property, always call Row.verify() *before*
595 modifying the column, for a given read-modify-write.
597 A transaction must be in progress."""
599 assert self._changes is not None
600 if not self._data or column_name in self._changes:
603 self._prereqs[column_name] = None
606 """Deletes this row from its table.
608 A transaction must be in progress."""
610 assert self._changes is not None
611 if self._data is None:
612 del self._idl.txn._txn_rows[self.uuid]
614 self._idl.txn._txn_rows[self.uuid] = self
615 self.__dict__["_changes"] = None
616 del self._table.rows[self.uuid]
618 def increment(self, column_name):
619 """Causes the transaction, when committed, to increment the value of
620 'column_name' within this row by 1. 'column_name' must have an integer
621 type. After the transaction commits successfully, the client may
622 retrieve the final (incremented) value of 'column_name' with
623 Transaction.get_increment_new_value().
625 The client could accomplish something similar by reading and writing
626 and verify()ing columns. However, increment() will never (by itself)
627 cause a transaction to fail because of a verify error.
629 The intended use is for incrementing the "next_cfg" column in
630 the Open_vSwitch table."""
631 self._idl.txn._increment(self, column_name)
634 def _uuid_name_from_uuid(uuid):
635 return "row%s" % str(uuid).replace("-", "_")
638 def _where_uuid_equals(uuid):
639 return [["_uuid", "==", ["uuid", str(uuid)]]]
642 class _InsertedRow(object):
643 def __init__(self, op_index):
644 self.op_index = op_index
648 class Transaction(object):
649 """A transaction may modify the contents of a database by modifying the
650 values of columns, deleting rows, inserting rows, or adding checks that
651 columns in the database have not changed ("verify" operations), through
654 Reading and writing columns and inserting and deleting rows are all
655 straightforward. The reasons to verify columns are less obvious.
656 Verification is the key to maintaining transactional integrity. Because
657 OVSDB handles multiple clients, it can happen that between the time that
658 OVSDB client A reads a column and writes a new value, OVSDB client B has
659 written that column. Client A's write should not ordinarily overwrite
660 client B's, especially if the column in question is a "map" column that
661 contains several more or less independent data items. If client A adds a
662 "verify" operation before it writes the column, then the transaction fails
663 in case client B modifies it first. Client A will then see the new value
664 of the column and compose a new transaction based on the new contents
667 When a transaction is complete, which must be before the next call to
668 Idl.run(), call Transaction.commit() or Transaction.abort().
670 The life-cycle of a transaction looks like this:
672 1. Create the transaction and record the initial sequence number:
674 seqno = idl.change_seqno(idl)
675 txn = Transaction(idl)
677 2. Modify the database with Row and Transaction methods.
679 3. Commit the transaction by calling Transaction.commit(). The first call
680 to this function probably returns Transaction.INCOMPLETE. The client
681 must keep calling again along as this remains true, calling Idl.run() in
682 between to let the IDL do protocol processing. (If the client doesn't
683 have anything else to do in the meantime, it can use
684 Transaction.commit_block() to avoid having to loop itself.)
686 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
687 to change from the saved 'seqno' (it's possible that it's already
688 changed, in which case the client should not wait at all), then start
689 over from step 1. Only a call to Idl.run() will change the return value
690 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
692 # Status values that Transaction.commit() can return.
693 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
694 UNCHANGED = "unchanged" # Transaction didn't include any changes.
695 INCOMPLETE = "incomplete" # Commit in progress, please wait.
696 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
697 SUCCESS = "success" # Commit successful.
698 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
699 # reported an inconsistency, due to a network
700 # problem, or other transient failure. Wait
701 # for a change, then try again.
702 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
703 ERROR = "error" # Commit failed due to a hard error.
706 def status_to_string(status):
707 """Converts one of the status values that Transaction.commit() can
708 return into a human-readable string.
710 (The status values are in fact such strings already, so
711 there's nothing to do.)"""
714 def __init__(self, idl):
715 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
716 A given Idl may only have a single active transaction at a time.
718 A Transaction may modify the contents of a database by assigning new
719 values to columns (attributes of Row), deleting rows (with
720 Row.delete()), or inserting rows (with Transaction.insert()). It may
721 also check that columns in the database have not changed with
724 When a transaction is complete (which must be before the next call to
725 Idl.run()), call Transaction.commit() or Transaction.abort()."""
726 assert idl.txn is None
729 self._request_id = None
733 self._status = Transaction.UNCOMMITTED
736 self._commit_seqno = self.idl.change_seqno
739 self._inc_column = None
741 self._inserted_rows = {} # Map from UUID to _InsertedRow
743 def add_comment(self, comment):
744 """Appens 'comment' to the comments that will be passed to the OVSDB
745 server when this transaction is committed. (The comment will be
746 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
747 relatively human-readable form.)"""
748 self._comments.append(comment)
750 def wait(self, poller):
751 """Causes poll_block() to wake up if this transaction has completed
753 if self._status not in (Transaction.UNCOMMITTED,
754 Transaction.INCOMPLETE):
755 poller.immediate_wake()
757 def _substitute_uuids(self, json):
758 if type(json) in (list, tuple):
760 and json[0] == 'uuid'
761 and ovs.ovsuuid.is_valid_string(json[1])):
762 uuid = ovs.ovsuuid.from_string(json[1])
763 row = self._txn_rows.get(uuid, None)
764 if row and row._data is None:
765 return ["named-uuid", _uuid_name_from_uuid(uuid)]
767 return [self._substitute_uuids(elem) for elem in json]
770 def __disassemble(self):
773 for row in self._txn_rows.itervalues():
774 if row._changes is None:
775 row._table.rows[row.uuid] = row
776 elif row._data is None:
777 del row._table.rows[row.uuid]
778 row.__dict__["_changes"] = {}
779 row.__dict__["_prereqs"] = {}
783 """Attempts to commit 'txn'. Returns the status of the commit
784 operation, one of the following constants:
786 Transaction.INCOMPLETE:
788 The transaction is in progress, but not yet complete. The caller
789 should call again later, after calling Idl.run() to let the
790 IDL do OVSDB protocol processing.
792 Transaction.UNCHANGED:
794 The transaction is complete. (It didn't actually change the
795 database, so the IDL didn't send any request to the database
800 The caller previously called Transaction.abort().
804 The transaction was successful. The update made by the
805 transaction (and possibly other changes made by other database
806 clients) should already be visible in the IDL.
808 Transaction.TRY_AGAIN:
810 The transaction failed for some transient reason, e.g. because a
811 "verify" operation reported an inconsistency or due to a network
812 problem. The caller should wait for a change to the database,
813 then compose a new transaction, and commit the new transaction.
815 Use Idl.change_seqno to wait for a change in the database. It is
816 important to use its value *before* the initial call to
817 Transaction.commit() as the baseline for this purpose, because
818 the change that one should wait for can happen after the initial
819 call but before the call that returns Transaction.TRY_AGAIN, and
820 using some other baseline value in that situation could cause an
821 indefinite wait if the database rarely changes.
823 Transaction.NOT_LOCKED:
825 The transaction failed because the IDL has been configured to
826 require a database lock (with Idl.set_lock()) but didn't
827 get it yet or has already lost it.
829 Committing a transaction rolls back all of the changes that it made to
830 the IDL's copy of the database. If the transaction commits
831 successfully, then the database server will send an update and, thus,
832 the IDL will be updated with the committed changes."""
833 # The status can only change if we're the active transaction.
834 # (Otherwise, our status will change only in Idl.run().)
835 if self != self.idl.txn:
838 # If we need a lock but don't have it, give up quickly.
839 if self.idl.lock_name and not self.idl.has_lock():
840 self._status = Transaction.NOT_LOCKED
844 operations = [self.idl._db.name]
846 # Assert that we have the required lock (avoiding a race).
847 if self.idl.lock_name:
848 operations.append({"op": "assert",
849 "lock": self.idl.lock_name})
851 # Add prerequisites and declarations of new rows.
852 for row in self._txn_rows.itervalues():
856 for column_name in row._prereqs:
857 columns.append(column_name)
858 rows[column_name] = row._data[column_name].to_json()
859 operations.append({"op": "wait",
860 "table": row._table.name,
862 "where": _where_uuid_equals(row.uuid),
869 for row in self._txn_rows.itervalues():
870 if row._changes is None:
871 if row._table.is_root:
872 operations.append({"op": "delete",
873 "table": row._table.name,
874 "where": _where_uuid_equals(row.uuid)})
877 # Let ovsdb-server decide whether to really delete it.
880 op = {"table": row._table.name}
881 if row._data is None:
883 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
886 op_index = len(operations) - 1
887 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
890 op["where"] = _where_uuid_equals(row.uuid)
895 for column_name, datum in row._changes.iteritems():
896 if row._data is not None or not datum.is_default():
897 row_json[column_name] = (
898 self._substitute_uuids(datum.to_json()))
900 # If anything really changed, consider it an update.
901 # We can't suppress not-really-changed values earlier
902 # or transactions would become nonatomic (see the big
903 # comment inside Transaction._write()).
904 if (not any_updates and row._data is not None and
905 row._data[column_name] != datum):
908 if row._data is None or row_json:
909 operations.append(op)
912 if self._inc_row and any_updates:
913 self._inc_index = len(operations) - 1
915 operations.append({"op": "mutate",
916 "table": self._inc_row._table.name,
917 "where": self._substitute_uuids(
918 _where_uuid_equals(self._inc_row.uuid)),
919 "mutations": [[self._inc_column, "+=", 1]]})
920 operations.append({"op": "select",
921 "table": self._inc_row._table.name,
922 "where": self._substitute_uuids(
923 _where_uuid_equals(self._inc_row.uuid)),
924 "columns": [self._inc_column]})
928 operations.append({"op": "comment",
929 "comment": "\n".join(self._comments)})
933 operations.append({"op": "abort"})
936 self._status = Transaction.UNCHANGED
938 msg = ovs.jsonrpc.Message.create_request("transact", operations)
939 self._request_id = msg.id
940 if not self.idl._session.send(msg):
941 self.idl._outstanding_txns[self._request_id] = self
942 self._status = Transaction.INCOMPLETE
944 self._status = Transaction.TRY_AGAIN
949 def commit_block(self):
950 """Attempts to commit this transaction, blocking until the commit
951 either succeeds or fails. Returns the final commit status, which may
952 be any Transaction.* value other than Transaction.INCOMPLETE.
954 This function calls Idl.run() on this transaction'ss IDL, so it may
955 cause Idl.change_seqno to change."""
957 status = self.commit()
958 if status != Transaction.INCOMPLETE:
963 poller = ovs.poller.Poller()
964 self.idl.wait(poller)
968 def get_increment_new_value(self):
969 """Returns the final (incremented) value of the column in this
970 transaction that was set to be incremented by Row.increment. This
971 transaction must have committed successfully."""
972 assert self._status == Transaction.SUCCESS
973 return self._inc_new_value
976 """Aborts this transaction. If Transaction.commit() has already been
977 called then the transaction might get committed anyhow."""
979 if self._status in (Transaction.UNCOMMITTED,
980 Transaction.INCOMPLETE):
981 self._status = Transaction.ABORTED
984 """Returns a string representing this transaction's current status,
985 suitable for use in log messages."""
986 if self._status != Transaction.ERROR:
987 return Transaction.status_to_string(self._status)
991 return "no error details available"
993 def __set_error_json(self, json):
994 if self._error is None:
995 self._error = ovs.json.to_string(json)
997 def get_insert_uuid(self, uuid):
998 """Finds and returns the permanent UUID that the database assigned to a
999 newly inserted row, given the UUID that Transaction.insert() assigned
1000 locally to that row.
1002 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1003 or if it was assigned by that function and then deleted by Row.delete()
1004 within the same transaction. (Rows that are inserted and then deleted
1005 within a single transaction are never sent to the database server, so
1006 it never assigns them a permanent UUID.)
1008 This transaction must have completed successfully."""
1009 assert self._status in (Transaction.SUCCESS,
1010 Transaction.UNCHANGED)
1011 inserted_row = self._inserted_rows.get(uuid)
1013 return inserted_row.real
1016 def _increment(self, row, column):
1017 assert not self._inc_row
1019 self._inc_column = column
1021 def _write(self, row, column, datum):
1022 assert row._changes is not None
1026 # If this is a write-only column and the datum being written is the
1027 # same as the one already there, just skip the update entirely. This
1028 # is worth optimizing because we have a lot of columns that get
1029 # periodically refreshed into the database but don't actually change
1032 # We don't do this for read/write columns because that would break
1033 # atomicity of transactions--some other client might have written a
1034 # different value in that column since we read it. (But if a whole
1035 # transaction only does writes of existing values, without making any
1036 # real changes, we will drop the whole transaction later in
1037 # ovsdb_idl_txn_commit().)
1038 if not column.alert and row._data.get(column.name) == datum:
1039 new_value = row._changes.get(column.name)
1040 if new_value is None or new_value == datum:
1043 txn._txn_rows[row.uuid] = row
1044 row._changes[column.name] = datum.copy()
1046 def insert(self, table, new_uuid=None):
1047 """Inserts and returns a new row in 'table', which must be one of the
1048 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1050 The new row is assigned a provisional UUID. If 'uuid' is None then one
1051 is randomly generated; otherwise 'uuid' should specify a randomly
1052 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1053 different UUID when 'txn' is committed, but the IDL will replace any
1054 uses of the provisional UUID in the data to be to be committed by the
1055 UUID assigned by ovsdb-server."""
1056 assert self._status == Transaction.UNCOMMITTED
1057 if new_uuid is None:
1058 new_uuid = uuid.uuid4()
1059 row = Row(self.idl, table, new_uuid, None)
1060 table.rows[row.uuid] = row
1061 self._txn_rows[row.uuid] = row
1064 def _process_reply(self, msg):
1065 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1066 self._status = Transaction.ERROR
1067 elif type(msg.result) not in (list, tuple):
1069 vlog.warn('reply to "transact" is not JSON array')
1078 # This isn't an error in itself but indicates that some
1079 # prior operation failed, so make sure that we know about
1082 elif type(op) == dict:
1083 error = op.get("error")
1084 if error is not None:
1085 if error == "timed out":
1087 elif error == "not owner":
1089 elif error == "aborted":
1093 self.__set_error_json(op)
1096 self.__set_error_json(op)
1098 vlog.warn("operation reply is not JSON null or object")
1100 if not soft_errors and not hard_errors and not lock_errors:
1101 if self._inc_row and not self.__process_inc_reply(ops):
1104 for insert in self._inserted_rows.itervalues():
1105 if not self.__process_insert_reply(insert, ops):
1109 self._status = Transaction.ERROR
1111 self._status = Transaction.NOT_LOCKED
1113 self._status = Transaction.TRY_AGAIN
1115 self._status = Transaction.SUCCESS
1118 def __check_json_type(json, types, name):
1121 vlog.warn("%s is missing" % name)
1123 elif type(json) not in types:
1125 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1130 def __process_inc_reply(self, ops):
1131 if self._inc_index + 2 > len(ops):
1133 vlog.warn("reply does not contain enough operations for "
1134 "increment (has %d, needs %d)" %
1135 (len(ops), self._inc_index + 2))
1137 # We know that this is a JSON object because the loop in
1138 # __process_reply() already checked.
1139 mutate = ops[self._inc_index]
1140 count = mutate.get("count")
1141 if not Transaction.__check_json_type(count, (int, long),
1142 '"mutate" reply "count"'):
1146 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1149 select = ops[self._inc_index + 1]
1150 rows = select.get("rows")
1151 if not Transaction.__check_json_type(rows, (list, tuple),
1152 '"select" reply "rows"'):
1156 vlog.warn('"select" reply "rows" has %d elements '
1157 'instead of 1' % len(rows))
1160 if not Transaction.__check_json_type(row, (dict,),
1161 '"select" reply row'):
1163 column = row.get(self._inc_column)
1164 if not Transaction.__check_json_type(column, (int, long),
1165 '"select" reply inc column'):
1167 self._inc_new_value = column
1170 def __process_insert_reply(self, insert, ops):
1171 if insert.op_index >= len(ops):
1173 vlog.warn("reply does not contain enough operations "
1174 "for insert (has %d, needs %d)"
1175 % (len(ops), insert.op_index))
1178 # We know that this is a JSON object because the loop in
1179 # __process_reply() already checked.
1180 reply = ops[insert.op_index]
1181 json_uuid = reply.get("uuid")
1182 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1183 '"insert" reply "uuid"'):
1187 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1190 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1197 class SchemaHelper(object):
1198 """IDL Schema helper.
1200 This class encapsulates the logic required to generate schemas suitable
1201 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1202 they are interested in using register_columns(). When finished, the
1203 get_idl_schema() function may be called.
1205 The location on disk of the schema used may be found in the
1206 'schema_location' variable."""
1208 def __init__(self, location=None, schema_json=None):
1209 """Creates a new Schema object.
1211 'location' file path to ovs schema. None means default location
1212 'schema_json' schema in json preresentation in memory
1215 if location and schema_json:
1216 raise ValueError("both location and schema_json can't be "
1217 "specified. it's ambiguous.")
1218 if schema_json is None:
1219 if location is None:
1220 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1221 schema_json = ovs.json.from_file(location)
1223 self.schema_json = schema_json
1227 def register_columns(self, table, columns):
1228 """Registers interest in the given 'columns' of 'table'. Future calls
1229 to get_idl_schema() will include 'table':column for each column in
1230 'columns'. This function automatically avoids adding duplicate entries
1233 'table' must be a string.
1234 'columns' must be a list of strings.
1237 assert type(table) is str
1238 assert type(columns) is list
1240 columns = set(columns) | self._tables.get(table, set())
1241 self._tables[table] = columns
1243 def register_table(self, table):
1244 """Registers interest in the given all columns of 'table'. Future calls
1245 to get_idl_schema() will include all columns of 'table'.
1247 'table' must be a string
1249 assert type(table) is str
1250 self._tables[table] = set() # empty set means all columns in the table
1252 def register_all(self):
1253 """Registers interest in every column of every table."""
1256 def get_idl_schema(self):
1257 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1258 object based on columns registered using the register_columns()
1261 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1262 self.schema_json = None
1266 for table, columns in self._tables.iteritems():
1267 schema_tables[table] = (
1268 self._keep_table_columns(schema, table, columns))
1270 schema.tables = schema_tables
1273 def _keep_table_columns(self, schema, table_name, columns):
1274 assert table_name in schema.tables
1275 table = schema.tables[table_name]
1278 # empty set means all columns in the table
1282 for column_name in columns:
1283 assert type(column_name) is str
1284 assert column_name in table.columns
1286 new_columns[column_name] = table.columns[column_name]
1288 table.columns = new_columns