1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from ovs.db import error
26 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
28 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
29 requests to an OVSDB database server and parses the responses, converting
30 raw JSON into data structures that are easier for clients to digest.
32 The IDL also assists with issuing database transactions. The client
33 creates a transaction, manipulates the IDL data structures, and commits or
34 aborts the transaction. The IDL then composes and issues the necessary
35 JSON-RPC requests and reports to the client whether the transaction
36 completed successfully.
38 The client is allowed to access the following attributes directly, in a
41 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
42 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
43 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
46 The client may directly read and write the Row objects referenced by the
47 'rows' map values. Refer to Row for more details.
49 - 'change_seqno': A number that represents the IDL's state. When the IDL
50 is updated (by Idl.run()), its value changes.
52 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
53 if no lock is configured.
55 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
56 lock, and False otherwise.
58 Locking and unlocking happens asynchronously from the database client's
59 point of view, so the information is only useful for optimization
60 (e.g. if the client doesn't have the lock then there's no point in trying
61 to write to the database).
63 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
64 the database server has indicated that some other client already owns the
65 requested lock, and False otherwise.
67 - 'txn': The ovs.db.idl.Transaction object for the database transaction
68 currently being constructed, if there is one, or None otherwise.
71 def __init__(self, remote, schema):
72 """Creates and returns a connection to the database named 'db_name' on
73 'remote', which should be in a form acceptable to
74 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
75 replica of the remote database.
77 'schema' should be the schema for the remote database. The caller may
78 have cut it down by removing tables or columns that are not of
79 interest. The IDL will only replicate the tables and columns that
80 remain. The caller may also add a attribute named 'alert' to selected
81 remaining columns, setting its value to False; if so, then changes to
82 those columns will not be considered changes to the database for the
83 purpose of the return value of Idl.run() and Idl.change_seqno. This is
84 useful for columns that the IDL's client will write but not read.
86 The IDL uses and modifies 'schema' directly."""
88 self.tables = schema.tables
90 self._session = ovs.jsonrpc.Session.open(remote)
91 self._monitor_request_id = None
92 self._last_seqno = None
96 self.lock_name = None # Name of lock we need, None if none.
97 self.has_lock = False # Has db server said we have the lock?
98 self.is_lock_contended = False # Has db server said we can't get lock?
99 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
101 # Transaction support.
103 self._outstanding_txns = {}
105 for table in schema.tables.itervalues():
106 for column in table.columns.itervalues():
107 if not hasattr(column, 'alert'):
109 table.need_table = False
114 """Closes the connection to the database. The IDL will no longer
116 self._session.close()
119 """Processes a batch of messages from the database server. Returns
120 True if the database as seen through the IDL changed, False if it did
121 not change. The initial fetch of the entire contents of the remote
122 database is considered to be one kind of change. If the IDL has been
123 configured to acquire a database lock (with Idl.set_lock()), then
124 successfully acquiring the lock is also considered to be a change.
126 This function can return occasional false positives, that is, report
127 that the database changed even though it didn't. This happens if the
128 connection to the database drops and reconnects, which causes the
129 database contents to be reloaded even if they didn't change. (It could
130 also happen if the database server sends out a "change" that reflects
131 what we already thought was in the database, but the database server is
132 not supposed to do that.)
134 As an alternative to checking the return value, the client may check
135 for changes in self.change_seqno."""
137 initial_change_seqno = self.change_seqno
142 if not self._session.is_connected():
145 seqno = self._session.get_seqno()
146 if seqno != self._last_seqno:
147 self._last_seqno = seqno
148 self.__txn_abort_all()
149 self.__send_monitor_request()
151 self.__send_lock_request()
154 msg = self._session.recv()
157 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
158 and msg.method == "update"
159 and len(msg.params) == 2
160 and msg.params[0] == None):
161 # Database contents changed.
162 self.__parse_update(msg.params[1])
163 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
164 and self._monitor_request_id is not None
165 and self._monitor_request_id == msg.id):
166 # Reply to our "monitor" request.
168 self.change_seqno += 1
169 self._monitor_request_id = None
171 self.__parse_update(msg.result)
172 except error.Error, e:
173 logging.error("%s: parse error in received schema: %s"
174 % (self._session.get_name(), e))
176 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
177 and self._lock_request_id is not None
178 and self._lock_request_id == msg.id):
179 # Reply to our "lock" request.
180 self.__parse_lock_reply(msg.result)
181 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
182 and msg.method == "locked"):
184 self.__parse_lock_notify(msg.params, True)
185 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
186 and msg.method == "stolen"):
187 # Someone else stole our lock.
188 self.__parse_lock_notify(msg.params, False)
189 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
190 # Reply to our echo request. Ignore it.
192 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
193 ovs.jsonrpc.Message.T_REPLY)
194 and self.__txn_process_reply(msg)):
195 # __txn_process_reply() did everything needed.
198 # This can happen if a transaction is destroyed before we
199 # receive the reply, so keep the log level low.
200 logging.debug("%s: received unexpected %s message"
201 % (self._session.get_name(),
202 ovs.jsonrpc.Message.type_to_string(msg.type)))
204 return initial_change_seqno != self.change_seqno
206 def wait(self, poller):
207 """Arranges for poller.block() to wake up when self.run() has something
208 to do or when activity occurs on a transaction on 'self'."""
209 self._session.wait(poller)
210 self._session.recv_wait(poller)
212 def has_ever_connected(self):
213 """Returns True, if the IDL successfully connected to the remote
214 database and retrieved its contents (even if the connection
215 subsequently dropped and is in the process of reconnecting). If so,
216 then the IDL contains an atomic snapshot of the database's contents
217 (but it might be arbitrarily old if the connection dropped).
219 Returns False if the IDL has never connected or retrieved the
220 database's contents. If so, the IDL is empty."""
221 return self.change_seqno != 0
223 def force_reconnect(self):
224 """Forces the IDL to drop its connection to the database and reconnect.
225 In the meantime, the contents of the IDL will not change."""
226 self._session.force_reconnect()
228 def set_lock(self, lock_name):
229 """If 'lock_name' is not None, configures the IDL to obtain the named
230 lock from the database server and to avoid modifying the database when
231 the lock cannot be acquired (that is, when another client has the same
234 If 'lock_name' is None, drops the locking requirement and releases the
237 assert not self._outstanding_txns
239 if self.lock_name and (not lock_name or lock_name != self.lock_name):
240 # Release previous lock.
241 self.__send_unlock_request()
242 self.lock_name = None
243 self.is_lock_contended = False
245 if lock_name and not self.lock_name:
247 self.lock_name = lock_name
248 self.__send_lock_request()
253 for table in self.tables.itervalues():
259 self.change_seqno += 1
261 def __update_has_lock(self, new_has_lock):
262 if new_has_lock and not self.has_lock:
263 if self._monitor_request_id is None:
264 self.change_seqno += 1
266 # We're waiting for a monitor reply, so don't signal that the
267 # database changed. The monitor reply will increment
268 # change_seqno anyhow.
270 self.is_lock_contended = False
271 self.has_lock = new_has_lock
273 def __do_send_lock_request(self, method):
274 self.__update_has_lock(False)
275 self._lock_request_id = None
276 if self._session.is_connected():
277 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
279 self._session.send(msg)
284 def __send_lock_request(self):
285 self._lock_request_id = self.__do_send_lock_request("lock")
287 def __send_unlock_request(self):
288 self.__do_send_lock_request("unlock")
290 def __parse_lock_reply(self, result):
291 self._lock_request_id = None
292 got_lock = type(result) == dict and result.get("locked") is True
293 self.__update_has_lock(got_lock)
295 self.is_lock_contended = True
297 def __parse_lock_notify(self, params, new_has_lock):
298 if (self.lock_name is not None
299 and type(params) in (list, tuple)
301 and params[0] == self.lock_name):
302 self.__update_has_lock(self, new_has_lock)
304 self.is_lock_contended = True
306 def __send_monitor_request(self):
307 monitor_requests = {}
308 for table in self.tables.itervalues():
309 monitor_requests[table.name] = {"columns": table.columns.keys()}
310 msg = ovs.jsonrpc.Message.create_request(
311 "monitor", [self._db.name, None, monitor_requests])
312 self._monitor_request_id = msg.id
313 self._session.send(msg)
315 def __parse_update(self, update):
317 self.__do_parse_update(update)
318 except error.Error, e:
319 logging.error("%s: error parsing update: %s"
320 % (self._session.get_name(), e))
322 def __do_parse_update(self, table_updates):
323 if type(table_updates) != dict:
324 raise error.Error("<table-updates> is not an object",
327 for table_name, table_update in table_updates.iteritems():
328 table = self.tables.get(table_name)
330 raise error.Error('<table-updates> includes unknown '
331 'table "%s"' % table_name)
333 if type(table_update) != dict:
334 raise error.Error('<table-update> for table "%s" is not '
335 'an object' % table_name, table_update)
337 for uuid_string, row_update in table_update.iteritems():
338 if not ovs.ovsuuid.is_valid_string(uuid_string):
339 raise error.Error('<table-update> for table "%s" '
340 'contains bad UUID "%s" as member '
341 'name' % (table_name, uuid_string),
343 uuid = ovs.ovsuuid.from_string(uuid_string)
345 if type(row_update) != dict:
346 raise error.Error('<table-update> for table "%s" '
347 'contains <row-update> for %s that '
349 % (table_name, uuid_string))
351 parser = ovs.db.parser.Parser(row_update, "row-update")
352 old = parser.get_optional("old", [dict])
353 new = parser.get_optional("new", [dict])
356 if not old and not new:
357 raise error.Error('<row-update> missing "old" and '
358 '"new" members', row_update)
360 if self.__process_update(table, uuid, old, new):
361 self.change_seqno += 1
363 def __process_update(self, table, uuid, old, new):
364 """Returns True if a column changed, False otherwise."""
365 row = table.rows.get(uuid)
374 logging.warning("cannot delete missing row %s from table %s"
375 % (uuid, table.name))
379 row = self.__create_row(table, uuid)
383 logging.warning("cannot add existing row %s to table %s"
384 % (uuid, table.name))
385 if self.__row_update(table, row, new):
389 row = self.__create_row(table, uuid)
392 logging.warning("cannot modify missing row %s in table %s"
393 % (uuid, table.name))
394 if self.__row_update(table, row, new):
398 def __row_update(self, table, row, row_json):
400 for column_name, datum_json in row_json.iteritems():
401 column = table.columns.get(column_name)
404 logging.warning("unknown column %s updating table %s"
405 % (column_name, table.name))
409 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
410 except error.Error, e:
412 logging.warning("error parsing column %s in table %s: %s"
413 % (column_name, table.name, e))
416 if datum != row._data[column_name]:
417 row._data[column_name] = datum
421 # Didn't really change but the OVSDB monitor protocol always
422 # includes every value in a row.
426 def __create_row(self, table, uuid):
428 for column in table.columns.itervalues():
429 data[column.name] = ovs.db.data.Datum.default(column.type)
430 row = table.rows[uuid] = Row(self, table, uuid, data)
434 self._session.force_reconnect()
436 def __txn_abort_all(self):
437 while self._outstanding_txns:
438 txn = self._outstanding_txns.popitem()[1]
439 txn._status = Transaction.TRY_AGAIN
441 def __txn_process_reply(self, msg):
442 txn = self._outstanding_txns.pop(msg.id, None)
444 txn._process_reply(msg)
446 def _uuid_to_row(atom, base):
448 return base.ref_table.rows.get(atom)
452 def _row_to_uuid(value):
453 if type(value) == Row:
459 """A row within an IDL.
461 The client may access the following attributes directly:
463 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
465 - An attribute for each column in the Row's table, named for the column,
466 whose values are as returned by Datum.to_python() for the column's type.
468 If some error occurs (e.g. the database server's idea of the column is
469 different from the IDL's idea), then the attribute values is the
470 "default" value return by Datum.default() for the column's type. (It is
471 important to know this because the default value may violate constraints
472 for the column's type, e.g. the default integer value is 0 even if column
473 contraints require the column's value to be positive.)
475 When a transaction is active, column attributes may also be assigned new
476 values. Committing the transaction will then cause the new value to be
477 stored into the database.
479 *NOTE*: In the current implementation, the value of a column is a *copy*
480 of the value in the database. This means that modifying its value
481 directly will have no useful effect. For example, the following:
482 row.mycolumn["a"] = "b" # don't do this
483 will not change anything in the database, even after commit. To modify
484 the column, instead assign the modified column value back to the column:
489 def __init__(self, idl, table, uuid, data):
490 # All of the explicit references to self.__dict__ below are required
491 # to set real attributes with invoking self.__getattr__().
492 self.__dict__["uuid"] = uuid
494 self.__dict__["_idl"] = idl
495 self.__dict__["_table"] = table
497 # _data is the committed data. It takes the following values:
499 # - A dictionary that maps every column name to a Datum, if the row
500 # exists in the committed form of the database.
502 # - None, if this row is newly inserted within the active transaction
503 # and thus has no committed form.
504 self.__dict__["_data"] = data
506 # _changes describes changes to this row within the active transaction.
507 # It takes the following values:
509 # - {}, the empty dictionary, if no transaction is active or if the
510 # row has yet not been changed within this transaction.
512 # - A dictionary that maps a column name to its new Datum, if an
513 # active transaction changes those columns' values.
515 # - A dictionary that maps every column name to a Datum, if the row
516 # is newly inserted within the active transaction.
518 # - None, if this transaction deletes this row.
519 self.__dict__["_changes"] = {}
521 # A dictionary whose keys are the names of columns that must be
522 # verified as prerequisites when the transaction commits. The values
523 # in the dictionary are all None.
524 self.__dict__["_prereqs"] = {}
526 def __getattr__(self, column_name):
527 assert self._changes is not None
529 datum = self._changes.get(column_name)
531 datum = self._data[column_name]
533 return datum.to_python(_uuid_to_row)
535 def __setattr__(self, column_name, value):
536 assert self._changes is not None
539 column = self._table.columns[column_name]
541 datum = ovs.db.data.Datum.from_python(column.type, value,
543 except error.Error, e:
545 logging.error("attempting to write bad value to column %s (%s)"
548 self._idl.txn._write(self, column, datum)
550 def verify(self, column_name):
551 """Causes the original contents of column 'column_name' in this row to
552 be verified as a prerequisite to completing the transaction. That is,
553 if 'column_name' changed in this row (or if this row was deleted)
554 between the time that the IDL originally read its contents and the time
555 that the transaction commits, then the transaction aborts and
556 Transaction.commit() returns Transaction.TRY_AGAIN.
558 The intention is that, to ensure that no transaction commits based on
559 dirty reads, an application should call Row.verify() on each data item
560 read as part of a read-modify-write operation.
562 In some cases Row.verify() reduces to a no-op, because the current
563 value of the column is already known:
565 - If this row is a row created by the current transaction (returned
566 by Transaction.insert()).
568 - If the column has already been modified within the current
571 Because of the latter property, always call Row.verify() *before*
572 modifying the column, for a given read-modify-write.
574 A transaction must be in progress."""
576 assert self._changes is not None
577 if not self._data or column_name in self._changes:
580 self._prereqs[column_name] = None
583 """Deletes this row from its table.
585 A transaction must be in progress."""
587 assert self._changes is not None
588 if self._data is None:
589 del self._idl.txn._txn_rows[self.uuid]
590 self.__dict__["_changes"] = None
591 del self._table.rows[self.uuid]
593 def _uuid_name_from_uuid(uuid):
594 return "row%s" % str(uuid).replace("-", "_")
596 def _where_uuid_equals(uuid):
597 return [["_uuid", "==", ["uuid", str(uuid)]]]
599 class _InsertedRow(object):
600 def __init__(self, op_index):
601 self.op_index = op_index
604 class Transaction(object):
605 # Status values that Transaction.commit() can return.
606 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
607 UNCHANGED = "unchanged" # Transaction didn't include any changes.
608 INCOMPLETE = "incomplete" # Commit in progress, please wait.
609 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
610 SUCCESS = "success" # Commit successful.
611 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
612 # reported an inconsistency, due to a network
613 # problem, or other transient failure.
614 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
615 ERROR = "error" # Commit failed due to a hard error.
618 def status_to_string(status):
619 """Converts one of the status values that Transaction.commit() can
620 return into a human-readable string.
622 (The status values are in fact such strings already, so
623 there's nothing to do.)"""
626 def __init__(self, idl):
627 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
628 A given Idl may only have a single active transaction at a time.
630 A Transaction may modify the contents of a database by assigning new
631 values to columns (attributes of Row), deleting rows (with
632 Row.delete()), or inserting rows (with Transaction.insert()). It may
633 also check that columns in the database have not changed with
636 When a transaction is complete (which must be before the next call to
637 Idl.run()), call Transaction.commit() or Transaction.abort()."""
638 assert idl.txn is None
641 self._request_id = None
645 self._status = Transaction.UNCOMMITTED
649 self._inc_table = None
650 self._inc_column = None
651 self._inc_where = None
653 self._inserted_rows = {} # Map from UUID to _InsertedRow
655 def add_comment(self, comment):
656 """Appens 'comment' to the comments that will be passed to the OVSDB
657 server when this transaction is committed. (The comment will be
658 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
659 relatively human-readable form.)"""
660 self._comments.append(comment)
662 def increment(self, table, column, where):
663 assert not self._inc_table
664 self._inc_table = table
665 self._inc_column = column
666 self._inc_where = where
668 def wait(self, poller):
669 if self._status not in (Transaction.UNCOMMITTED,
670 Transaction.INCOMPLETE):
671 poller.immediate_wake()
673 def _substitute_uuids(self, json):
674 if type(json) in (list, tuple):
676 and json[0] == 'uuid'
677 and ovs.ovsuuid.is_valid_string(json[1])):
678 uuid = ovs.ovsuuid.from_string(json[1])
679 row = self._txn_rows.get(uuid, None)
680 if row and row._data is None:
681 return ["named-uuid", _uuid_name_from_uuid(uuid)]
684 def __disassemble(self):
687 for row in self._txn_rows.itervalues():
688 if row._changes is None:
689 row._table.rows[row.uuid] = row
690 elif row._data is None:
691 del row._table.rows[row.uuid]
692 row.__dict__["_changes"] = {}
693 row.__dict__["_prereqs"] = {}
697 """Attempts to commit this transaction and returns the status of the
698 commit operation, one of the constants declared as class attributes.
699 If the return value is Transaction.INCOMPLETE, then the transaction is
700 not yet complete and the caller should try calling again later, after
701 calling Idl.run() to run the Idl.
703 Committing a transaction rolls back all of the changes that it made to
704 the Idl's copy of the database. If the transaction commits
705 successfully, then the database server will send an update and, thus,
706 the Idl will be updated with the committed changes."""
707 # The status can only change if we're the active transaction.
708 # (Otherwise, our status will change only in Idl.run().)
709 if self != self.idl.txn:
712 # If we need a lock but don't have it, give up quickly.
713 if self.idl.lock_name and not self.idl.has_lock():
714 self._status = Transaction.NOT_LOCKED
718 operations = [self.idl._db.name]
720 # Assert that we have the required lock (avoiding a race).
721 if self.idl.lock_name:
722 operations.append({"op": "assert",
723 "lock": self.idl.lock_name})
725 # Add prerequisites and declarations of new rows.
726 for row in self._txn_rows.itervalues():
730 for column_name in row._prereqs:
731 columns.append(column_name)
732 rows[column_name] = row._data[column_name].to_json()
733 operations.append({"op": "wait",
734 "table": row._table.name,
736 "where": _where_uuid_equals(row.uuid),
743 for row in self._txn_rows.itervalues():
744 if row._changes is None:
745 if row._table.is_root:
746 operations.append({"op": "delete",
747 "table": row._table.name,
748 "where": _where_uuid_equals(row.uuid)})
751 # Let ovsdb-server decide whether to really delete it.
754 op = {"table": row._table.name}
755 if row._data is None:
757 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
760 op_index = len(operations) - 1
761 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
764 op["where"] = _where_uuid_equals(row.uuid)
769 for column_name, datum in row._changes.iteritems():
770 if row._data is not None or not datum.is_default():
771 row_json[column_name] = self._substitute_uuids(datum.to_json())
773 # If anything really changed, consider it an update.
774 # We can't suppress not-really-changed values earlier
775 # or transactions would become nonatomic (see the big
776 # comment inside Transaction._write()).
777 if (not any_updates and row._data is not None and
778 row._data[column_name] != datum):
781 if row._data is None or row_json:
782 operations.append(op)
785 if self._inc_table and any_updates:
786 self._inc_index = len(operations) - 1
788 operations.append({"op": "mutate",
789 "table": self._inc_table,
790 "where": self._substitute_uuids(self._inc_where),
791 "mutations": [[self._inc_column, "+=", 1]]})
792 operations.append({"op": "select",
793 "table": self._inc_table,
794 "where": self._substitute_uuids(self._inc_where),
795 "columns": [self._inc_column]})
799 operations.append({"op": "comment",
800 "comment": "\n".join(self._comments)})
804 operations.append({"op": "abort"})
807 self._status = Transaction.UNCHANGED
809 msg = ovs.jsonrpc.Message.create_request("transact", operations)
810 self._request_id = msg.id
811 if not self.idl._session.send(msg):
812 self.idl._outstanding_txns[self._request_id] = self
813 self._status = Transaction.INCOMPLETE
815 self._status = Transaction.TRY_AGAIN
820 def commit_block(self):
822 status = self.commit()
823 if status != Transaction.INCOMPLETE:
828 poller = ovs.poller.Poller()
829 self.idl.wait(poller)
833 def get_increment_new_value(self):
834 assert self._status == Transaction.SUCCESS
835 return self._inc_new_value
838 """Aborts this transaction. If Transaction.commit() has already been
839 called then the transaction might get committed anyhow."""
841 if self._status in (Transaction.UNCOMMITTED,
842 Transaction.INCOMPLETE):
843 self._status = Transaction.ABORTED
846 """Returns a string representing this transaction's current status,
847 suitable for use in log messages."""
848 if self._status != Transaction.ERROR:
849 return Transaction.status_to_string(self._status)
853 return "no error details available"
855 def __set_error_json(self, json):
856 if self._error is None:
857 self._error = ovs.json.to_string(json)
859 def get_insert_uuid(self, uuid):
860 """Finds and returns the permanent UUID that the database assigned to a
861 newly inserted row, given the UUID that Transaction.insert() assigned
864 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
865 or if it was assigned by that function and then deleted by Row.delete()
866 within the same transaction. (Rows that are inserted and then deleted
867 within a single transaction are never sent to the database server, so
868 it never assigns them a permanent UUID.)
870 This transaction must have completed successfully."""
871 assert self._status in (Transaction.SUCCESS,
872 Transaction.UNCHANGED)
873 inserted_row = self._inserted_rows.get(uuid)
875 return inserted_row.real
878 def _write(self, row, column, datum):
879 assert row._changes is not None
883 # If this is a write-only column and the datum being written is the
884 # same as the one already there, just skip the update entirely. This
885 # is worth optimizing because we have a lot of columns that get
886 # periodically refreshed into the database but don't actually change
889 # We don't do this for read/write columns because that would break
890 # atomicity of transactions--some other client might have written a
891 # different value in that column since we read it. (But if a whole
892 # transaction only does writes of existing values, without making any
893 # real changes, we will drop the whole transaction later in
894 # ovsdb_idl_txn_commit().)
895 if not column.alert and row._data.get(column.name) == datum:
896 new_value = row._changes.get(column.name)
897 if new_value is None or new_value == datum:
900 txn._txn_rows[row.uuid] = row
901 row._changes[column.name] = datum.copy()
903 def insert(self, table, new_uuid=None):
904 """Inserts and returns a new row in 'table', which must be one of the
905 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
907 The new row is assigned a provisional UUID. If 'uuid' is None then one
908 is randomly generated; otherwise 'uuid' should specify a randomly
909 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
910 different UUID when 'txn' is committed, but the IDL will replace any
911 uses of the provisional UUID in the data to be to be committed by the
912 UUID assigned by ovsdb-server."""
913 assert self._status == Transaction.UNCOMMITTED
915 new_uuid = uuid.uuid4()
916 row = Row(self.idl, table, new_uuid, None)
917 table.rows[row.uuid] = row
918 self._txn_rows[row.uuid] = row
921 def _process_reply(self, msg):
922 if msg.type == ovs.jsonrpc.Message.T_ERROR:
923 self._status = Transaction.ERROR
924 elif type(msg.result) not in (list, tuple):
926 logging.warning('reply to "transact" is not JSON array')
935 # This isn't an error in itself but indicates that some
936 # prior operation failed, so make sure that we know about
939 elif type(op) == dict:
940 error = op.get("error")
941 if error is not None:
942 if error == "timed out":
944 elif error == "not owner":
946 elif error == "aborted":
950 self.__set_error_json(op)
953 self.__set_error_json(op)
955 logging.warning("operation reply is not JSON null or "
958 if not soft_errors and not hard_errors and not lock_errors:
959 if self._inc_table and not self.__process_inc_reply(ops):
962 for insert in self._inserted_rows.itervalues():
963 if not self.__process_insert_reply(insert, ops):
967 self._status = Transaction.ERROR
969 self._status = Transaction.NOT_LOCKED
971 self._status = Transaction.TRY_AGAIN
973 self._status = Transaction.SUCCESS
976 def __check_json_type(json, types, name):
979 logging.warning("%s is missing" % name)
981 elif type(json) not in types:
983 logging.warning("%s has unexpected type %s" % (name, type(json)))
988 def __process_inc_reply(self, ops):
989 if self._inc_index + 2 > len(ops):
991 logging.warning("reply does not contain enough operations for "
992 "increment (has %d, needs %d)" %
993 (len(ops), self._inc_index + 2))
995 # We know that this is a JSON object because the loop in
996 # __process_reply() already checked.
997 mutate = ops[self._inc_index]
998 count = mutate.get("count")
999 if not Transaction.__check_json_type(count, (int, long),
1000 '"mutate" reply "count"'):
1004 logging.warning('"mutate" reply "count" is %d instead of 1'
1008 select = ops[self._inc_index + 1]
1009 rows = select.get("rows")
1010 if not Transaction.__check_json_type(rows, (list, tuple),
1011 '"select" reply "rows"'):
1015 logging.warning('"select" reply "rows" has %d elements '
1016 'instead of 1' % len(rows))
1019 if not Transaction.__check_json_type(row, (dict,),
1020 '"select" reply row'):
1022 column = row.get(self._inc_column)
1023 if not Transaction.__check_json_type(column, (int, long),
1024 '"select" reply inc column'):
1026 self._inc_new_value = column
1029 def __process_insert_reply(self, insert, ops):
1030 if insert.op_index >= len(ops):
1032 logging.warning("reply does not contain enough operations "
1033 "for insert (has %d, needs %d)"
1034 % (len(ops), insert.op_index))
1037 # We know that this is a JSON object because the loop in
1038 # __process_reply() already checked.
1039 reply = ops[insert.op_index]
1040 json_uuid = reply.get("uuid")
1041 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1042 '"insert" reply "uuid"'):
1046 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1049 logging.warning('"insert" reply "uuid" is not a JSON UUID')