ovsdb-idl: Prevent occasional hang when multiple database clients race.
[sliver-openvswitch.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29
30 class Idl:
31     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
32
33     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
34     requests to an OVSDB database server and parses the responses, converting
35     raw JSON into data structures that are easier for clients to digest.
36
37     The IDL also assists with issuing database transactions.  The client
38     creates a transaction, manipulates the IDL data structures, and commits or
39     aborts the transaction.  The IDL then composes and issues the necessary
40     JSON-RPC requests and reports to the client whether the transaction
41     completed successfully.
42
43     The client is allowed to access the following attributes directly, in a
44     read-only fashion:
45
46     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
48       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
49       to a Row object.
50
51       The client may directly read and write the Row objects referenced by the
52       'rows' map values.  Refer to Row for more details.
53
54     - 'change_seqno': A number that represents the IDL's state.  When the IDL
55       is updated (by Idl.run()), its value changes.
56
57     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58       if no lock is configured.
59
60     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61       lock, and False otherwise.
62
63       Locking and unlocking happens asynchronously from the database client's
64       point of view, so the information is only useful for optimization
65       (e.g. if the client doesn't have the lock then there's no point in trying
66       to write to the database).
67
68     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69       the database server has indicated that some other client already owns the
70       requested lock, and False otherwise.
71
72     - 'txn': The ovs.db.idl.Transaction object for the database transaction
73       currently being constructed, if there is one, or None otherwise.
74 """
75
76     def __init__(self, remote, schema):
77         """Creates and returns a connection to the database named 'db_name' on
78         'remote', which should be in a form acceptable to
79         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
80         replica of the remote database.
81
82         'schema' should be the schema for the remote database.  The caller may
83         have cut it down by removing tables or columns that are not of
84         interest.  The IDL will only replicate the tables and columns that
85         remain.  The caller may also add a attribute named 'alert' to selected
86         remaining columns, setting its value to False; if so, then changes to
87         those columns will not be considered changes to the database for the
88         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
89         useful for columns that the IDL's client will write but not read.
90
91         The IDL uses and modifies 'schema' directly."""
92
93         self.tables = schema.tables
94         self._db = schema
95         self._session = ovs.jsonrpc.Session.open(remote)
96         self._monitor_request_id = None
97         self._last_seqno = None
98         self.change_seqno = 0
99
100         # Database locking.
101         self.lock_name = None          # Name of lock we need, None if none.
102         self.has_lock = False          # Has db server said we have the lock?
103         self.is_lock_contended = False  # Has db server said we can't get lock?
104         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
105
106         # Transaction support.
107         self.txn = None
108         self._outstanding_txns = {}
109
110         for table in schema.tables.itervalues():
111             for column in table.columns.itervalues():
112                 if not hasattr(column, 'alert'):
113                     column.alert = True
114             table.need_table = False
115             table.rows = {}
116             table.idl = self
117
118     def close(self):
119         """Closes the connection to the database.  The IDL will no longer
120         update."""
121         self._session.close()
122
123     def run(self):
124         """Processes a batch of messages from the database server.  Returns
125         True if the database as seen through the IDL changed, False if it did
126         not change.  The initial fetch of the entire contents of the remote
127         database is considered to be one kind of change.  If the IDL has been
128         configured to acquire a database lock (with Idl.set_lock()), then
129         successfully acquiring the lock is also considered to be a change.
130
131         This function can return occasional false positives, that is, report
132         that the database changed even though it didn't.  This happens if the
133         connection to the database drops and reconnects, which causes the
134         database contents to be reloaded even if they didn't change.  (It could
135         also happen if the database server sends out a "change" that reflects
136         what we already thought was in the database, but the database server is
137         not supposed to do that.)
138
139         As an alternative to checking the return value, the client may check
140         for changes in self.change_seqno."""
141         assert not self.txn
142         initial_change_seqno = self.change_seqno
143         self._session.run()
144         i = 0
145         while i < 50:
146             i += 1
147             if not self._session.is_connected():
148                 break
149
150             seqno = self._session.get_seqno()
151             if seqno != self._last_seqno:
152                 self._last_seqno = seqno
153                 self.__txn_abort_all()
154                 self.__send_monitor_request()
155                 if self.lock_name:
156                     self.__send_lock_request()
157                 break
158
159             msg = self._session.recv()
160             if msg is None:
161                 break
162             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
163                 and msg.method == "update"
164                 and len(msg.params) == 2
165                 and msg.params[0] == None):
166                 # Database contents changed.
167                 self.__parse_update(msg.params[1])
168             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
169                   and self._monitor_request_id is not None
170                   and self._monitor_request_id == msg.id):
171                 # Reply to our "monitor" request.
172                 try:
173                     self.change_seqno += 1
174                     self._monitor_request_id = None
175                     self.__clear()
176                     self.__parse_update(msg.result)
177                 except error.Error, e:
178                     vlog.err("%s: parse error in received schema: %s"
179                               % (self._session.get_name(), e))
180                     self.__error()
181             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
182                   and self._lock_request_id is not None
183                   and self._lock_request_id == msg.id):
184                 # Reply to our "lock" request.
185                 self.__parse_lock_reply(msg.result)
186             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
187                   and msg.method == "locked"):
188                 # We got our lock.
189                 self.__parse_lock_notify(msg.params, True)
190             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
191                   and msg.method == "stolen"):
192                 # Someone else stole our lock.
193                 self.__parse_lock_notify(msg.params, False)
194             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
195                 # Reply to our echo request.  Ignore it.
196                 pass
197             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
198                                ovs.jsonrpc.Message.T_REPLY)
199                   and self.__txn_process_reply(msg)):
200                 # __txn_process_reply() did everything needed.
201                 pass
202             else:
203                 # This can happen if a transaction is destroyed before we
204                 # receive the reply, so keep the log level low.
205                 vlog.dbg("%s: received unexpected %s message"
206                          % (self._session.get_name(),
207                              ovs.jsonrpc.Message.type_to_string(msg.type)))
208
209         return initial_change_seqno != self.change_seqno
210
211     def wait(self, poller):
212         """Arranges for poller.block() to wake up when self.run() has something
213         to do or when activity occurs on a transaction on 'self'."""
214         self._session.wait(poller)
215         self._session.recv_wait(poller)
216
217     def has_ever_connected(self):
218         """Returns True, if the IDL successfully connected to the remote
219         database and retrieved its contents (even if the connection
220         subsequently dropped and is in the process of reconnecting).  If so,
221         then the IDL contains an atomic snapshot of the database's contents
222         (but it might be arbitrarily old if the connection dropped).
223
224         Returns False if the IDL has never connected or retrieved the
225         database's contents.  If so, the IDL is empty."""
226         return self.change_seqno != 0
227
228     def force_reconnect(self):
229         """Forces the IDL to drop its connection to the database and reconnect.
230         In the meantime, the contents of the IDL will not change."""
231         self._session.force_reconnect()
232
233     def set_lock(self, lock_name):
234         """If 'lock_name' is not None, configures the IDL to obtain the named
235         lock from the database server and to avoid modifying the database when
236         the lock cannot be acquired (that is, when another client has the same
237         lock).
238
239         If 'lock_name' is None, drops the locking requirement and releases the
240         lock."""
241         assert not self.txn
242         assert not self._outstanding_txns
243
244         if self.lock_name and (not lock_name or lock_name != self.lock_name):
245             # Release previous lock.
246             self.__send_unlock_request()
247             self.lock_name = None
248             self.is_lock_contended = False
249
250         if lock_name and not self.lock_name:
251             # Acquire new lock.
252             self.lock_name = lock_name
253             self.__send_lock_request()
254
255     def __clear(self):
256         changed = False
257
258         for table in self.tables.itervalues():
259             if table.rows:
260                 changed = True
261                 table.rows = {}
262
263         if changed:
264             self.change_seqno += 1
265
266     def __update_has_lock(self, new_has_lock):
267         if new_has_lock and not self.has_lock:
268             if self._monitor_request_id is None:
269                 self.change_seqno += 1
270             else:
271                 # We're waiting for a monitor reply, so don't signal that the
272                 # database changed.  The monitor reply will increment
273                 # change_seqno anyhow.
274                 pass
275             self.is_lock_contended = False
276         self.has_lock = new_has_lock
277
278     def __do_send_lock_request(self, method):
279         self.__update_has_lock(False)
280         self._lock_request_id = None
281         if self._session.is_connected():
282             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
283             msg_id = msg.id
284             self._session.send(msg)
285         else:
286             msg_id = None
287         return msg_id
288
289     def __send_lock_request(self):
290         self._lock_request_id = self.__do_send_lock_request("lock")
291
292     def __send_unlock_request(self):
293         self.__do_send_lock_request("unlock")
294
295     def __parse_lock_reply(self, result):
296         self._lock_request_id = None
297         got_lock = type(result) == dict and result.get("locked") is True
298         self.__update_has_lock(got_lock)
299         if not got_lock:
300             self.is_lock_contended = True
301
302     def __parse_lock_notify(self, params, new_has_lock):
303         if (self.lock_name is not None
304             and type(params) in (list, tuple)
305             and params
306             and params[0] == self.lock_name):
307             self.__update_has_lock(self, new_has_lock)
308             if not new_has_lock:
309                 self.is_lock_contended = True
310
311     def __send_monitor_request(self):
312         monitor_requests = {}
313         for table in self.tables.itervalues():
314             monitor_requests[table.name] = {"columns": table.columns.keys()}
315         msg = ovs.jsonrpc.Message.create_request(
316             "monitor", [self._db.name, None, monitor_requests])
317         self._monitor_request_id = msg.id
318         self._session.send(msg)
319
320     def __parse_update(self, update):
321         try:
322             self.__do_parse_update(update)
323         except error.Error, e:
324             vlog.err("%s: error parsing update: %s"
325                      % (self._session.get_name(), e))
326
327     def __do_parse_update(self, table_updates):
328         if type(table_updates) != dict:
329             raise error.Error("<table-updates> is not an object",
330                               table_updates)
331
332         for table_name, table_update in table_updates.iteritems():
333             table = self.tables.get(table_name)
334             if not table:
335                 raise error.Error('<table-updates> includes unknown '
336                                   'table "%s"' % table_name)
337
338             if type(table_update) != dict:
339                 raise error.Error('<table-update> for table "%s" is not '
340                                   'an object' % table_name, table_update)
341
342             for uuid_string, row_update in table_update.iteritems():
343                 if not ovs.ovsuuid.is_valid_string(uuid_string):
344                     raise error.Error('<table-update> for table "%s" '
345                                       'contains bad UUID "%s" as member '
346                                       'name' % (table_name, uuid_string),
347                                       table_update)
348                 uuid = ovs.ovsuuid.from_string(uuid_string)
349
350                 if type(row_update) != dict:
351                     raise error.Error('<table-update> for table "%s" '
352                                       'contains <row-update> for %s that '
353                                       'is not an object'
354                                       % (table_name, uuid_string))
355
356                 parser = ovs.db.parser.Parser(row_update, "row-update")
357                 old = parser.get_optional("old", [dict])
358                 new = parser.get_optional("new", [dict])
359                 parser.finish()
360
361                 if not old and not new:
362                     raise error.Error('<row-update> missing "old" and '
363                                       '"new" members', row_update)
364
365                 if self.__process_update(table, uuid, old, new):
366                     self.change_seqno += 1
367
368     def __process_update(self, table, uuid, old, new):
369         """Returns True if a column changed, False otherwise."""
370         row = table.rows.get(uuid)
371         changed = False
372         if not new:
373             # Delete row.
374             if row:
375                 del table.rows[uuid]
376                 changed = True
377             else:
378                 # XXX rate-limit
379                 vlog.warn("cannot delete missing row %s from table %s"
380                           % (uuid, table.name))
381         elif not old:
382             # Insert row.
383             if not row:
384                 row = self.__create_row(table, uuid)
385                 changed = True
386             else:
387                 # XXX rate-limit
388                 vlog.warn("cannot add existing row %s to table %s"
389                           % (uuid, table.name))
390             if self.__row_update(table, row, new):
391                 changed = True
392         else:
393             if not row:
394                 row = self.__create_row(table, uuid)
395                 changed = True
396                 # XXX rate-limit
397                 vlog.warn("cannot modify missing row %s in table %s"
398                           % (uuid, table.name))
399             if self.__row_update(table, row, new):
400                 changed = True
401         return changed
402
403     def __row_update(self, table, row, row_json):
404         changed = False
405         for column_name, datum_json in row_json.iteritems():
406             column = table.columns.get(column_name)
407             if not column:
408                 # XXX rate-limit
409                 vlog.warn("unknown column %s updating table %s"
410                           % (column_name, table.name))
411                 continue
412
413             try:
414                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
415             except error.Error, e:
416                 # XXX rate-limit
417                 vlog.warn("error parsing column %s in table %s: %s"
418                           % (column_name, table.name, e))
419                 continue
420
421             if datum != row._data[column_name]:
422                 row._data[column_name] = datum
423                 if column.alert:
424                     changed = True
425             else:
426                 # Didn't really change but the OVSDB monitor protocol always
427                 # includes every value in a row.
428                 pass
429         return changed
430
431     def __create_row(self, table, uuid):
432         data = {}
433         for column in table.columns.itervalues():
434             data[column.name] = ovs.db.data.Datum.default(column.type)
435         row = table.rows[uuid] = Row(self, table, uuid, data)
436         return row
437
438     def __error(self):
439         self._session.force_reconnect()
440
441     def __txn_abort_all(self):
442         while self._outstanding_txns:
443             txn = self._outstanding_txns.popitem()[1]
444             txn._status = Transaction.AGAIN_WAIT
445
446     def __txn_process_reply(self, msg):
447         txn = self._outstanding_txns.pop(msg.id, None)
448         if txn:
449             txn._process_reply(msg)
450
451
452 def _uuid_to_row(atom, base):
453     if base.ref_table:
454         return base.ref_table.rows.get(atom)
455     else:
456         return atom
457
458
459 def _row_to_uuid(value):
460     if type(value) == Row:
461         return value.uuid
462     else:
463         return value
464
465
466 class Row(object):
467     """A row within an IDL.
468
469     The client may access the following attributes directly:
470
471     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
472
473     - An attribute for each column in the Row's table, named for the column,
474       whose values are as returned by Datum.to_python() for the column's type.
475
476       If some error occurs (e.g. the database server's idea of the column is
477       different from the IDL's idea), then the attribute values is the
478       "default" value return by Datum.default() for the column's type.  (It is
479       important to know this because the default value may violate constraints
480       for the column's type, e.g. the default integer value is 0 even if column
481       contraints require the column's value to be positive.)
482
483       When a transaction is active, column attributes may also be assigned new
484       values.  Committing the transaction will then cause the new value to be
485       stored into the database.
486
487       *NOTE*: In the current implementation, the value of a column is a *copy*
488       of the value in the database.  This means that modifying its value
489       directly will have no useful effect.  For example, the following:
490         row.mycolumn["a"] = "b"              # don't do this
491       will not change anything in the database, even after commit.  To modify
492       the column, instead assign the modified column value back to the column:
493         d = row.mycolumn
494         d["a"] = "b"
495         row.mycolumn = d
496 """
497     def __init__(self, idl, table, uuid, data):
498         # All of the explicit references to self.__dict__ below are required
499         # to set real attributes with invoking self.__getattr__().
500         self.__dict__["uuid"] = uuid
501
502         self.__dict__["_idl"] = idl
503         self.__dict__["_table"] = table
504
505         # _data is the committed data.  It takes the following values:
506         #
507         #   - A dictionary that maps every column name to a Datum, if the row
508         #     exists in the committed form of the database.
509         #
510         #   - None, if this row is newly inserted within the active transaction
511         #     and thus has no committed form.
512         self.__dict__["_data"] = data
513
514         # _changes describes changes to this row within the active transaction.
515         # It takes the following values:
516         #
517         #   - {}, the empty dictionary, if no transaction is active or if the
518         #     row has yet not been changed within this transaction.
519         #
520         #   - A dictionary that maps a column name to its new Datum, if an
521         #     active transaction changes those columns' values.
522         #
523         #   - A dictionary that maps every column name to a Datum, if the row
524         #     is newly inserted within the active transaction.
525         #
526         #   - None, if this transaction deletes this row.
527         self.__dict__["_changes"] = {}
528
529         # A dictionary whose keys are the names of columns that must be
530         # verified as prerequisites when the transaction commits.  The values
531         # in the dictionary are all None.
532         self.__dict__["_prereqs"] = {}
533
534     def __getattr__(self, column_name):
535         assert self._changes is not None
536
537         datum = self._changes.get(column_name)
538         if datum is None:
539             datum = self._data[column_name]
540
541         return datum.to_python(_uuid_to_row)
542
543     def __setattr__(self, column_name, value):
544         assert self._changes is not None
545         assert self._idl.txn
546
547         column = self._table.columns[column_name]
548         try:
549             datum = ovs.db.data.Datum.from_python(column.type, value,
550                                                   _row_to_uuid)
551         except error.Error, e:
552             # XXX rate-limit
553             vlog.err("attempting to write bad value to column %s (%s)"
554                      % (column_name, e))
555             return
556         self._idl.txn._write(self, column, datum)
557
558     def verify(self, column_name):
559         """Causes the original contents of column 'column_name' in this row to
560         be verified as a prerequisite to completing the transaction.  That is,
561         if 'column_name' changed in this row (or if this row was deleted)
562         between the time that the IDL originally read its contents and the time
563         that the transaction commits, then the transaction aborts and
564         Transaction.commit() returns Transaction.AGAIN_WAIT or
565         Transaction.AGAIN_NOW (depending on whether the database change has
566         already been received).
567
568         The intention is that, to ensure that no transaction commits based on
569         dirty reads, an application should call Row.verify() on each data item
570         read as part of a read-modify-write operation.
571
572         In some cases Row.verify() reduces to a no-op, because the current
573         value of the column is already known:
574
575           - If this row is a row created by the current transaction (returned
576             by Transaction.insert()).
577
578           - If the column has already been modified within the current
579             transaction.
580
581         Because of the latter property, always call Row.verify() *before*
582         modifying the column, for a given read-modify-write.
583
584         A transaction must be in progress."""
585         assert self._idl.txn
586         assert self._changes is not None
587         if not self._data or column_name in self._changes:
588             return
589
590         self._prereqs[column_name] = None
591
592     def delete(self):
593         """Deletes this row from its table.
594
595         A transaction must be in progress."""
596         assert self._idl.txn
597         assert self._changes is not None
598         if self._data is None:
599             del self._idl.txn._txn_rows[self.uuid]
600         self.__dict__["_changes"] = None
601         del self._table.rows[self.uuid]
602
603
604 def _uuid_name_from_uuid(uuid):
605     return "row%s" % str(uuid).replace("-", "_")
606
607
608 def _where_uuid_equals(uuid):
609     return [["_uuid", "==", ["uuid", str(uuid)]]]
610
611
612 class _InsertedRow(object):
613     def __init__(self, op_index):
614         self.op_index = op_index
615         self.real = None
616
617
618 class Transaction(object):
619     # Status values that Transaction.commit() can return.
620     UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
621     UNCHANGED = "unchanged"      # Transaction didn't include any changes.
622     INCOMPLETE = "incomplete"    # Commit in progress, please wait.
623     ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
624     SUCCESS = "success"          # Commit successful.
625     AGAIN_WAIT = "wait then try again"
626                                  # Commit failed because a "verify" operation
627                                  # reported an inconsistency, due to a network
628                                  # problem, or other transient failure.  Wait
629                                  # for a change, then try again.
630     AGAIN_NOW = "try again now"  # Same as AGAIN_WAIT but try again right away.
631     NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
632     ERROR = "error"              # Commit failed due to a hard error.
633
634     @staticmethod
635     def status_to_string(status):
636         """Converts one of the status values that Transaction.commit() can
637         return into a human-readable string.
638
639         (The status values are in fact such strings already, so
640         there's nothing to do.)"""
641         return status
642
643     def __init__(self, idl):
644         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
645         A given Idl may only have a single active transaction at a time.
646
647         A Transaction may modify the contents of a database by assigning new
648         values to columns (attributes of Row), deleting rows (with
649         Row.delete()), or inserting rows (with Transaction.insert()).  It may
650         also check that columns in the database have not changed with
651         Row.verify().
652
653         When a transaction is complete (which must be before the next call to
654         Idl.run()), call Transaction.commit() or Transaction.abort()."""
655         assert idl.txn is None
656
657         idl.txn = self
658         self._request_id = None
659         self.idl = idl
660         self.dry_run = False
661         self._txn_rows = {}
662         self._status = Transaction.UNCOMMITTED
663         self._error = None
664         self._comments = []
665         self._commit_seqno = self.idl.change_seqno
666
667         self._inc_table = None
668         self._inc_column = None
669         self._inc_where = None
670
671         self._inserted_rows = {}  # Map from UUID to _InsertedRow
672
673     def add_comment(self, comment):
674         """Appens 'comment' to the comments that will be passed to the OVSDB
675         server when this transaction is committed.  (The comment will be
676         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
677         relatively human-readable form.)"""
678         self._comments.append(comment)
679
680     def increment(self, table, column, where):
681         assert not self._inc_table
682         self._inc_table = table
683         self._inc_column = column
684         self._inc_where = where
685
686     def wait(self, poller):
687         if self._status not in (Transaction.UNCOMMITTED,
688                                 Transaction.INCOMPLETE):
689             poller.immediate_wake()
690
691     def _substitute_uuids(self, json):
692         if type(json) in (list, tuple):
693             if (len(json) == 2
694                 and json[0] == 'uuid'
695                 and ovs.ovsuuid.is_valid_string(json[1])):
696                 uuid = ovs.ovsuuid.from_string(json[1])
697                 row = self._txn_rows.get(uuid, None)
698                 if row and row._data is None:
699                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
700         return json
701
702     def __disassemble(self):
703         self.idl.txn = None
704
705         for row in self._txn_rows.itervalues():
706             if row._changes is None:
707                 row._table.rows[row.uuid] = row
708             elif row._data is None:
709                 del row._table.rows[row.uuid]
710             row.__dict__["_changes"] = {}
711             row.__dict__["_prereqs"] = {}
712         self._txn_rows = {}
713
714     def commit(self):
715         """Attempts to commit this transaction and returns the status of the
716         commit operation, one of the constants declared as class attributes.
717         If the return value is Transaction.INCOMPLETE, then the transaction is
718         not yet complete and the caller should try calling again later, after
719         calling Idl.run() to run the Idl.
720
721         Committing a transaction rolls back all of the changes that it made to
722         the Idl's copy of the database.  If the transaction commits
723         successfully, then the database server will send an update and, thus,
724         the Idl will be updated with the committed changes."""
725         # The status can only change if we're the active transaction.
726         # (Otherwise, our status will change only in Idl.run().)
727         if self != self.idl.txn:
728             return self._status
729
730         # If we need a lock but don't have it, give up quickly.
731         if self.idl.lock_name and not self.idl.has_lock():
732             self._status = Transaction.NOT_LOCKED
733             self.__disassemble()
734             return self._status
735
736         operations = [self.idl._db.name]
737
738         # Assert that we have the required lock (avoiding a race).
739         if self.idl.lock_name:
740             operations.append({"op": "assert",
741                                "lock": self.idl.lock_name})
742
743         # Add prerequisites and declarations of new rows.
744         for row in self._txn_rows.itervalues():
745             if row._prereqs:
746                 rows = {}
747                 columns = []
748                 for column_name in row._prereqs:
749                     columns.append(column_name)
750                     rows[column_name] = row._data[column_name].to_json()
751                 operations.append({"op": "wait",
752                                    "table": row._table.name,
753                                    "timeout": 0,
754                                    "where": _where_uuid_equals(row.uuid),
755                                    "until": "==",
756                                    "columns": columns,
757                                    "rows": [rows]})
758
759         # Add updates.
760         any_updates = False
761         for row in self._txn_rows.itervalues():
762             if row._changes is None:
763                 if row._table.is_root:
764                     operations.append({"op": "delete",
765                                        "table": row._table.name,
766                                        "where": _where_uuid_equals(row.uuid)})
767                     any_updates = True
768                 else:
769                     # Let ovsdb-server decide whether to really delete it.
770                     pass
771             elif row._changes:
772                 op = {"table": row._table.name}
773                 if row._data is None:
774                     op["op"] = "insert"
775                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
776                     any_updates = True
777
778                     op_index = len(operations) - 1
779                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
780                 else:
781                     op["op"] = "update"
782                     op["where"] = _where_uuid_equals(row.uuid)
783
784                 row_json = {}
785                 op["row"] = row_json
786
787                 for column_name, datum in row._changes.iteritems():
788                     if row._data is not None or not datum.is_default():
789                         row_json[column_name] = (
790                                 self._substitute_uuids(datum.to_json()))
791
792                         # If anything really changed, consider it an update.
793                         # We can't suppress not-really-changed values earlier
794                         # or transactions would become nonatomic (see the big
795                         # comment inside Transaction._write()).
796                         if (not any_updates and row._data is not None and
797                             row._data[column_name] != datum):
798                             any_updates = True
799
800                 if row._data is None or row_json:
801                     operations.append(op)
802
803         # Add increment.
804         if self._inc_table and any_updates:
805             self._inc_index = len(operations) - 1
806
807             operations.append({"op": "mutate",
808                                "table": self._inc_table,
809                                "where": self._substitute_uuids(
810                                    self._inc_where),
811                                "mutations": [[self._inc_column, "+=", 1]]})
812             operations.append({"op": "select",
813                                "table": self._inc_table,
814                                "where": self._substitute_uuids(
815                                    self._inc_where),
816                                "columns": [self._inc_column]})
817
818         # Add comment.
819         if self._comments:
820             operations.append({"op": "comment",
821                                "comment": "\n".join(self._comments)})
822
823         # Dry run?
824         if self.dry_run:
825             operations.append({"op": "abort"})
826
827         if not any_updates:
828             self._status = Transaction.UNCHANGED
829         else:
830             msg = ovs.jsonrpc.Message.create_request("transact", operations)
831             self._request_id = msg.id
832             if not self.idl._session.send(msg):
833                 self.idl._outstanding_txns[self._request_id] = self
834                 self._status = Transaction.INCOMPLETE
835             else:
836                 self._status = Transaction.AGAIN_WAIT
837
838         self.__disassemble()
839         return self._status
840
841     def commit_block(self):
842         while True:
843             status = self.commit()
844             if status != Transaction.INCOMPLETE:
845                 return status
846
847             self.idl.run()
848
849             poller = ovs.poller.Poller()
850             self.idl.wait(poller)
851             self.wait(poller)
852             poller.block()
853
854     def get_increment_new_value(self):
855         assert self._status == Transaction.SUCCESS
856         return self._inc_new_value
857
858     def abort(self):
859         """Aborts this transaction.  If Transaction.commit() has already been
860         called then the transaction might get committed anyhow."""
861         self.__disassemble()
862         if self._status in (Transaction.UNCOMMITTED,
863                             Transaction.INCOMPLETE):
864             self._status = Transaction.ABORTED
865
866     def get_error(self):
867         """Returns a string representing this transaction's current status,
868         suitable for use in log messages."""
869         if self._status != Transaction.ERROR:
870             return Transaction.status_to_string(self._status)
871         elif self._error:
872             return self._error
873         else:
874             return "no error details available"
875
876     def __set_error_json(self, json):
877         if self._error is None:
878             self._error = ovs.json.to_string(json)
879
880     def get_insert_uuid(self, uuid):
881         """Finds and returns the permanent UUID that the database assigned to a
882         newly inserted row, given the UUID that Transaction.insert() assigned
883         locally to that row.
884
885         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
886         or if it was assigned by that function and then deleted by Row.delete()
887         within the same transaction.  (Rows that are inserted and then deleted
888         within a single transaction are never sent to the database server, so
889         it never assigns them a permanent UUID.)
890
891         This transaction must have completed successfully."""
892         assert self._status in (Transaction.SUCCESS,
893                                 Transaction.UNCHANGED)
894         inserted_row = self._inserted_rows.get(uuid)
895         if inserted_row:
896             return inserted_row.real
897         return None
898
899     def _write(self, row, column, datum):
900         assert row._changes is not None
901
902         txn = row._idl.txn
903
904         # If this is a write-only column and the datum being written is the
905         # same as the one already there, just skip the update entirely.  This
906         # is worth optimizing because we have a lot of columns that get
907         # periodically refreshed into the database but don't actually change
908         # that often.
909         #
910         # We don't do this for read/write columns because that would break
911         # atomicity of transactions--some other client might have written a
912         # different value in that column since we read it.  (But if a whole
913         # transaction only does writes of existing values, without making any
914         # real changes, we will drop the whole transaction later in
915         # ovsdb_idl_txn_commit().)
916         if not column.alert and row._data.get(column.name) == datum:
917             new_value = row._changes.get(column.name)
918             if new_value is None or new_value == datum:
919                 return
920
921         txn._txn_rows[row.uuid] = row
922         row._changes[column.name] = datum.copy()
923
924     def insert(self, table, new_uuid=None):
925         """Inserts and returns a new row in 'table', which must be one of the
926         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
927
928         The new row is assigned a provisional UUID.  If 'uuid' is None then one
929         is randomly generated; otherwise 'uuid' should specify a randomly
930         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
931         different UUID when 'txn' is committed, but the IDL will replace any
932         uses of the provisional UUID in the data to be to be committed by the
933         UUID assigned by ovsdb-server."""
934         assert self._status == Transaction.UNCOMMITTED
935         if new_uuid is None:
936             new_uuid = uuid.uuid4()
937         row = Row(self.idl, table, new_uuid, None)
938         table.rows[row.uuid] = row
939         self._txn_rows[row.uuid] = row
940         return row
941
942     def _process_reply(self, msg):
943         if msg.type == ovs.jsonrpc.Message.T_ERROR:
944             self._status = Transaction.ERROR
945         elif type(msg.result) not in (list, tuple):
946             # XXX rate-limit
947             vlog.warn('reply to "transact" is not JSON array')
948         else:
949             hard_errors = False
950             soft_errors = False
951             lock_errors = False
952
953             ops = msg.result
954             for op in ops:
955                 if op is None:
956                     # This isn't an error in itself but indicates that some
957                     # prior operation failed, so make sure that we know about
958                     # it.
959                     soft_errors = True
960                 elif type(op) == dict:
961                     error = op.get("error")
962                     if error is not None:
963                         if error == "timed out":
964                             soft_errors = True
965                         elif error == "not owner":
966                             lock_errors = True
967                         elif error == "aborted":
968                             pass
969                         else:
970                             hard_errors = True
971                             self.__set_error_json(op)
972                 else:
973                     hard_errors = True
974                     self.__set_error_json(op)
975                     # XXX rate-limit
976                     vlog.warn("operation reply is not JSON null or object")
977
978             if not soft_errors and not hard_errors and not lock_errors:
979                 if self._inc_table and not self.__process_inc_reply(ops):
980                     hard_errors = True
981
982                 for insert in self._inserted_rows.itervalues():
983                     if not self.__process_insert_reply(insert, ops):
984                         hard_errors = True
985
986             if hard_errors:
987                 self._status = Transaction.ERROR
988             elif lock_errors:
989                 self._status = Transaction.NOT_LOCKED
990             elif soft_errors:
991                 if self._commit_seqno == self.idl.change_seqno:
992                     self._status = Transaction.AGAIN_WAIT
993                 else:
994                     self._status = Transaction.AGAIN_NOW
995             else:
996                 self._status = Transaction.SUCCESS
997
998     @staticmethod
999     def __check_json_type(json, types, name):
1000         if not json:
1001             # XXX rate-limit
1002             vlog.warn("%s is missing" % name)
1003             return False
1004         elif type(json) not in types:
1005             # XXX rate-limit
1006             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1007             return False
1008         else:
1009             return True
1010
1011     def __process_inc_reply(self, ops):
1012         if self._inc_index + 2 > len(ops):
1013             # XXX rate-limit
1014             vlog.warn("reply does not contain enough operations for "
1015                       "increment (has %d, needs %d)" %
1016                       (len(ops), self._inc_index + 2))
1017
1018         # We know that this is a JSON object because the loop in
1019         # __process_reply() already checked.
1020         mutate = ops[self._inc_index]
1021         count = mutate.get("count")
1022         if not Transaction.__check_json_type(count, (int, long),
1023                                              '"mutate" reply "count"'):
1024             return False
1025         if count != 1:
1026             # XXX rate-limit
1027             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1028             return False
1029
1030         select = ops[self._inc_index + 1]
1031         rows = select.get("rows")
1032         if not Transaction.__check_json_type(rows, (list, tuple),
1033                                              '"select" reply "rows"'):
1034             return False
1035         if len(rows) != 1:
1036             # XXX rate-limit
1037             vlog.warn('"select" reply "rows" has %d elements '
1038                       'instead of 1' % len(rows))
1039             return False
1040         row = rows[0]
1041         if not Transaction.__check_json_type(row, (dict,),
1042                                              '"select" reply row'):
1043             return False
1044         column = row.get(self._inc_column)
1045         if not Transaction.__check_json_type(column, (int, long),
1046                                              '"select" reply inc column'):
1047             return False
1048         self._inc_new_value = column
1049         return True
1050
1051     def __process_insert_reply(self, insert, ops):
1052         if insert.op_index >= len(ops):
1053             # XXX rate-limit
1054             vlog.warn("reply does not contain enough operations "
1055                       "for insert (has %d, needs %d)"
1056                       % (len(ops), insert.op_index))
1057             return False
1058
1059         # We know that this is a JSON object because the loop in
1060         # __process_reply() already checked.
1061         reply = ops[insert.op_index]
1062         json_uuid = reply.get("uuid")
1063         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1064                                              '"insert" reply "uuid"'):
1065             return False
1066
1067         try:
1068             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1069         except error.Error:
1070             # XXX rate-limit
1071             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1072             return False
1073
1074         insert.real = uuid_
1075         return True