python: Style cleanup.
[sliver-openvswitch.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import uuid
17
18 import ovs.jsonrpc
19 import ovs.db.parser
20 import ovs.db.schema
21 from ovs.db import error
22 import ovs.ovsuuid
23 import ovs.poller
24
25 __pychecker__ = 'no-classattr no-objattrs'
26
27
28 class Idl:
29     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
30
31     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
32     requests to an OVSDB database server and parses the responses, converting
33     raw JSON into data structures that are easier for clients to digest.
34
35     The IDL also assists with issuing database transactions.  The client
36     creates a transaction, manipulates the IDL data structures, and commits or
37     aborts the transaction.  The IDL then composes and issues the necessary
38     JSON-RPC requests and reports to the client whether the transaction
39     completed successfully.
40
41     The client is allowed to access the following attributes directly, in a
42     read-only fashion:
43
44     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
45       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
46       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
47       to a Row object.
48
49       The client may directly read and write the Row objects referenced by the
50       'rows' map values.  Refer to Row for more details.
51
52     - 'change_seqno': A number that represents the IDL's state.  When the IDL
53       is updated (by Idl.run()), its value changes.
54
55     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
56       if no lock is configured.
57
58     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
59       lock, and False otherwise.
60
61       Locking and unlocking happens asynchronously from the database client's
62       point of view, so the information is only useful for optimization
63       (e.g. if the client doesn't have the lock then there's no point in trying
64       to write to the database).
65
66     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
67       the database server has indicated that some other client already owns the
68       requested lock, and False otherwise.
69
70     - 'txn': The ovs.db.idl.Transaction object for the database transaction
71       currently being constructed, if there is one, or None otherwise.
72 """
73
74     def __init__(self, remote, schema):
75         """Creates and returns a connection to the database named 'db_name' on
76         'remote', which should be in a form acceptable to
77         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
78         replica of the remote database.
79
80         'schema' should be the schema for the remote database.  The caller may
81         have cut it down by removing tables or columns that are not of
82         interest.  The IDL will only replicate the tables and columns that
83         remain.  The caller may also add a attribute named 'alert' to selected
84         remaining columns, setting its value to False; if so, then changes to
85         those columns will not be considered changes to the database for the
86         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
87         useful for columns that the IDL's client will write but not read.
88
89         The IDL uses and modifies 'schema' directly."""
90
91         self.tables = schema.tables
92         self._db = schema
93         self._session = ovs.jsonrpc.Session.open(remote)
94         self._monitor_request_id = None
95         self._last_seqno = None
96         self.change_seqno = 0
97
98         # Database locking.
99         self.lock_name = None          # Name of lock we need, None if none.
100         self.has_lock = False          # Has db server said we have the lock?
101         self.is_lock_contended = False  # Has db server said we can't get lock?
102         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
103
104         # Transaction support.
105         self.txn = None
106         self._outstanding_txns = {}
107
108         for table in schema.tables.itervalues():
109             for column in table.columns.itervalues():
110                 if not hasattr(column, 'alert'):
111                     column.alert = True
112             table.need_table = False
113             table.rows = {}
114             table.idl = self
115
116     def close(self):
117         """Closes the connection to the database.  The IDL will no longer
118         update."""
119         self._session.close()
120
121     def run(self):
122         """Processes a batch of messages from the database server.  Returns
123         True if the database as seen through the IDL changed, False if it did
124         not change.  The initial fetch of the entire contents of the remote
125         database is considered to be one kind of change.  If the IDL has been
126         configured to acquire a database lock (with Idl.set_lock()), then
127         successfully acquiring the lock is also considered to be a change.
128
129         This function can return occasional false positives, that is, report
130         that the database changed even though it didn't.  This happens if the
131         connection to the database drops and reconnects, which causes the
132         database contents to be reloaded even if they didn't change.  (It could
133         also happen if the database server sends out a "change" that reflects
134         what we already thought was in the database, but the database server is
135         not supposed to do that.)
136
137         As an alternative to checking the return value, the client may check
138         for changes in self.change_seqno."""
139         assert not self.txn
140         initial_change_seqno = self.change_seqno
141         self._session.run()
142         i = 0
143         while i < 50:
144             i += 1
145             if not self._session.is_connected():
146                 break
147
148             seqno = self._session.get_seqno()
149             if seqno != self._last_seqno:
150                 self._last_seqno = seqno
151                 self.__txn_abort_all()
152                 self.__send_monitor_request()
153                 if self.lock_name:
154                     self.__send_lock_request()
155                 break
156
157             msg = self._session.recv()
158             if msg is None:
159                 break
160             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
161                 and msg.method == "update"
162                 and len(msg.params) == 2
163                 and msg.params[0] == None):
164                 # Database contents changed.
165                 self.__parse_update(msg.params[1])
166             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
167                   and self._monitor_request_id is not None
168                   and self._monitor_request_id == msg.id):
169                 # Reply to our "monitor" request.
170                 try:
171                     self.change_seqno += 1
172                     self._monitor_request_id = None
173                     self.__clear()
174                     self.__parse_update(msg.result)
175                 except error.Error, e:
176                     logging.error("%s: parse error in received schema: %s"
177                                   % (self._session.get_name(), e))
178                     self.__error()
179             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
180                   and self._lock_request_id is not None
181                   and self._lock_request_id == msg.id):
182                 # Reply to our "lock" request.
183                 self.__parse_lock_reply(msg.result)
184             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
185                   and msg.method == "locked"):
186                 # We got our lock.
187                 self.__parse_lock_notify(msg.params, True)
188             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
189                   and msg.method == "stolen"):
190                 # Someone else stole our lock.
191                 self.__parse_lock_notify(msg.params, False)
192             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
193                 # Reply to our echo request.  Ignore it.
194                 pass
195             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
196                                ovs.jsonrpc.Message.T_REPLY)
197                   and self.__txn_process_reply(msg)):
198                 # __txn_process_reply() did everything needed.
199                 pass
200             else:
201                 # This can happen if a transaction is destroyed before we
202                 # receive the reply, so keep the log level low.
203                 logging.debug("%s: received unexpected %s message"
204                               % (self._session.get_name(),
205                                  ovs.jsonrpc.Message.type_to_string(msg.type)))
206
207         return initial_change_seqno != self.change_seqno
208
209     def wait(self, poller):
210         """Arranges for poller.block() to wake up when self.run() has something
211         to do or when activity occurs on a transaction on 'self'."""
212         self._session.wait(poller)
213         self._session.recv_wait(poller)
214
215     def has_ever_connected(self):
216         """Returns True, if the IDL successfully connected to the remote
217         database and retrieved its contents (even if the connection
218         subsequently dropped and is in the process of reconnecting).  If so,
219         then the IDL contains an atomic snapshot of the database's contents
220         (but it might be arbitrarily old if the connection dropped).
221
222         Returns False if the IDL has never connected or retrieved the
223         database's contents.  If so, the IDL is empty."""
224         return self.change_seqno != 0
225
226     def force_reconnect(self):
227         """Forces the IDL to drop its connection to the database and reconnect.
228         In the meantime, the contents of the IDL will not change."""
229         self._session.force_reconnect()
230
231     def set_lock(self, lock_name):
232         """If 'lock_name' is not None, configures the IDL to obtain the named
233         lock from the database server and to avoid modifying the database when
234         the lock cannot be acquired (that is, when another client has the same
235         lock).
236
237         If 'lock_name' is None, drops the locking requirement and releases the
238         lock."""
239         assert not self.txn
240         assert not self._outstanding_txns
241
242         if self.lock_name and (not lock_name or lock_name != self.lock_name):
243             # Release previous lock.
244             self.__send_unlock_request()
245             self.lock_name = None
246             self.is_lock_contended = False
247
248         if lock_name and not self.lock_name:
249             # Acquire new lock.
250             self.lock_name = lock_name
251             self.__send_lock_request()
252
253     def __clear(self):
254         changed = False
255
256         for table in self.tables.itervalues():
257             if table.rows:
258                 changed = True
259                 table.rows = {}
260
261         if changed:
262             self.change_seqno += 1
263
264     def __update_has_lock(self, new_has_lock):
265         if new_has_lock and not self.has_lock:
266             if self._monitor_request_id is None:
267                 self.change_seqno += 1
268             else:
269                 # We're waiting for a monitor reply, so don't signal that the
270                 # database changed.  The monitor reply will increment
271                 # change_seqno anyhow.
272                 pass
273             self.is_lock_contended = False
274         self.has_lock = new_has_lock
275
276     def __do_send_lock_request(self, method):
277         self.__update_has_lock(False)
278         self._lock_request_id = None
279         if self._session.is_connected():
280             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
281             msg_id = msg.id
282             self._session.send(msg)
283         else:
284             msg_id = None
285         return msg_id
286
287     def __send_lock_request(self):
288         self._lock_request_id = self.__do_send_lock_request("lock")
289
290     def __send_unlock_request(self):
291         self.__do_send_lock_request("unlock")
292
293     def __parse_lock_reply(self, result):
294         self._lock_request_id = None
295         got_lock = type(result) == dict and result.get("locked") is True
296         self.__update_has_lock(got_lock)
297         if not got_lock:
298             self.is_lock_contended = True
299
300     def __parse_lock_notify(self, params, new_has_lock):
301         if (self.lock_name is not None
302             and type(params) in (list, tuple)
303             and params
304             and params[0] == self.lock_name):
305             self.__update_has_lock(self, new_has_lock)
306             if not new_has_lock:
307                 self.is_lock_contended = True
308
309     def __send_monitor_request(self):
310         monitor_requests = {}
311         for table in self.tables.itervalues():
312             monitor_requests[table.name] = {"columns": table.columns.keys()}
313         msg = ovs.jsonrpc.Message.create_request(
314             "monitor", [self._db.name, None, monitor_requests])
315         self._monitor_request_id = msg.id
316         self._session.send(msg)
317
318     def __parse_update(self, update):
319         try:
320             self.__do_parse_update(update)
321         except error.Error, e:
322             logging.error("%s: error parsing update: %s"
323                           % (self._session.get_name(), e))
324
325     def __do_parse_update(self, table_updates):
326         if type(table_updates) != dict:
327             raise error.Error("<table-updates> is not an object",
328                               table_updates)
329
330         for table_name, table_update in table_updates.iteritems():
331             table = self.tables.get(table_name)
332             if not table:
333                 raise error.Error('<table-updates> includes unknown '
334                                   'table "%s"' % table_name)
335
336             if type(table_update) != dict:
337                 raise error.Error('<table-update> for table "%s" is not '
338                                   'an object' % table_name, table_update)
339
340             for uuid_string, row_update in table_update.iteritems():
341                 if not ovs.ovsuuid.is_valid_string(uuid_string):
342                     raise error.Error('<table-update> for table "%s" '
343                                       'contains bad UUID "%s" as member '
344                                       'name' % (table_name, uuid_string),
345                                       table_update)
346                 uuid = ovs.ovsuuid.from_string(uuid_string)
347
348                 if type(row_update) != dict:
349                     raise error.Error('<table-update> for table "%s" '
350                                       'contains <row-update> for %s that '
351                                       'is not an object'
352                                       % (table_name, uuid_string))
353
354                 parser = ovs.db.parser.Parser(row_update, "row-update")
355                 old = parser.get_optional("old", [dict])
356                 new = parser.get_optional("new", [dict])
357                 parser.finish()
358
359                 if not old and not new:
360                     raise error.Error('<row-update> missing "old" and '
361                                       '"new" members', row_update)
362
363                 if self.__process_update(table, uuid, old, new):
364                     self.change_seqno += 1
365
366     def __process_update(self, table, uuid, old, new):
367         """Returns True if a column changed, False otherwise."""
368         row = table.rows.get(uuid)
369         changed = False
370         if not new:
371             # Delete row.
372             if row:
373                 del table.rows[uuid]
374                 changed = True
375             else:
376                 # XXX rate-limit
377                 logging.warning("cannot delete missing row %s from table %s"
378                                 % (uuid, table.name))
379         elif not old:
380             # Insert row.
381             if not row:
382                 row = self.__create_row(table, uuid)
383                 changed = True
384             else:
385                 # XXX rate-limit
386                 logging.warning("cannot add existing row %s to table %s"
387                                 % (uuid, table.name))
388             if self.__row_update(table, row, new):
389                 changed = True
390         else:
391             if not row:
392                 row = self.__create_row(table, uuid)
393                 changed = True
394                 # XXX rate-limit
395                 logging.warning("cannot modify missing row %s in table %s"
396                                 % (uuid, table.name))
397             if self.__row_update(table, row, new):
398                 changed = True
399         return changed
400
401     def __row_update(self, table, row, row_json):
402         changed = False
403         for column_name, datum_json in row_json.iteritems():
404             column = table.columns.get(column_name)
405             if not column:
406                 # XXX rate-limit
407                 logging.warning("unknown column %s updating table %s"
408                                 % (column_name, table.name))
409                 continue
410
411             try:
412                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
413             except error.Error, e:
414                 # XXX rate-limit
415                 logging.warning("error parsing column %s in table %s: %s"
416                                 % (column_name, table.name, e))
417                 continue
418
419             if datum != row._data[column_name]:
420                 row._data[column_name] = datum
421                 if column.alert:
422                     changed = True
423             else:
424                 # Didn't really change but the OVSDB monitor protocol always
425                 # includes every value in a row.
426                 pass
427         return changed
428
429     def __create_row(self, table, uuid):
430         data = {}
431         for column in table.columns.itervalues():
432             data[column.name] = ovs.db.data.Datum.default(column.type)
433         row = table.rows[uuid] = Row(self, table, uuid, data)
434         return row
435
436     def __error(self):
437         self._session.force_reconnect()
438
439     def __txn_abort_all(self):
440         while self._outstanding_txns:
441             txn = self._outstanding_txns.popitem()[1]
442             txn._status = Transaction.TRY_AGAIN
443
444     def __txn_process_reply(self, msg):
445         txn = self._outstanding_txns.pop(msg.id, None)
446         if txn:
447             txn._process_reply(msg)
448
449
450 def _uuid_to_row(atom, base):
451     if base.ref_table:
452         return base.ref_table.rows.get(atom)
453     else:
454         return atom
455
456
457 def _row_to_uuid(value):
458     if type(value) == Row:
459         return value.uuid
460     else:
461         return value
462
463
464 class Row(object):
465     """A row within an IDL.
466
467     The client may access the following attributes directly:
468
469     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
470
471     - An attribute for each column in the Row's table, named for the column,
472       whose values are as returned by Datum.to_python() for the column's type.
473
474       If some error occurs (e.g. the database server's idea of the column is
475       different from the IDL's idea), then the attribute values is the
476       "default" value return by Datum.default() for the column's type.  (It is
477       important to know this because the default value may violate constraints
478       for the column's type, e.g. the default integer value is 0 even if column
479       contraints require the column's value to be positive.)
480
481       When a transaction is active, column attributes may also be assigned new
482       values.  Committing the transaction will then cause the new value to be
483       stored into the database.
484
485       *NOTE*: In the current implementation, the value of a column is a *copy*
486       of the value in the database.  This means that modifying its value
487       directly will have no useful effect.  For example, the following:
488         row.mycolumn["a"] = "b"              # don't do this
489       will not change anything in the database, even after commit.  To modify
490       the column, instead assign the modified column value back to the column:
491         d = row.mycolumn
492         d["a"] = "b"
493         row.mycolumn = d
494 """
495     def __init__(self, idl, table, uuid, data):
496         # All of the explicit references to self.__dict__ below are required
497         # to set real attributes with invoking self.__getattr__().
498         self.__dict__["uuid"] = uuid
499
500         self.__dict__["_idl"] = idl
501         self.__dict__["_table"] = table
502
503         # _data is the committed data.  It takes the following values:
504         #
505         #   - A dictionary that maps every column name to a Datum, if the row
506         #     exists in the committed form of the database.
507         #
508         #   - None, if this row is newly inserted within the active transaction
509         #     and thus has no committed form.
510         self.__dict__["_data"] = data
511
512         # _changes describes changes to this row within the active transaction.
513         # It takes the following values:
514         #
515         #   - {}, the empty dictionary, if no transaction is active or if the
516         #     row has yet not been changed within this transaction.
517         #
518         #   - A dictionary that maps a column name to its new Datum, if an
519         #     active transaction changes those columns' values.
520         #
521         #   - A dictionary that maps every column name to a Datum, if the row
522         #     is newly inserted within the active transaction.
523         #
524         #   - None, if this transaction deletes this row.
525         self.__dict__["_changes"] = {}
526
527         # A dictionary whose keys are the names of columns that must be
528         # verified as prerequisites when the transaction commits.  The values
529         # in the dictionary are all None.
530         self.__dict__["_prereqs"] = {}
531
532     def __getattr__(self, column_name):
533         assert self._changes is not None
534
535         datum = self._changes.get(column_name)
536         if datum is None:
537             datum = self._data[column_name]
538
539         return datum.to_python(_uuid_to_row)
540
541     def __setattr__(self, column_name, value):
542         assert self._changes is not None
543         assert self._idl.txn
544
545         column = self._table.columns[column_name]
546         try:
547             datum = ovs.db.data.Datum.from_python(column.type, value,
548                                                   _row_to_uuid)
549         except error.Error, e:
550             # XXX rate-limit
551             logging.error("attempting to write bad value to column %s (%s)"
552                           % (column_name, e))
553             return
554         self._idl.txn._write(self, column, datum)
555
556     def verify(self, column_name):
557         """Causes the original contents of column 'column_name' in this row to
558         be verified as a prerequisite to completing the transaction.  That is,
559         if 'column_name' changed in this row (or if this row was deleted)
560         between the time that the IDL originally read its contents and the time
561         that the transaction commits, then the transaction aborts and
562         Transaction.commit() returns Transaction.TRY_AGAIN.
563
564         The intention is that, to ensure that no transaction commits based on
565         dirty reads, an application should call Row.verify() on each data item
566         read as part of a read-modify-write operation.
567
568         In some cases Row.verify() reduces to a no-op, because the current
569         value of the column is already known:
570
571           - If this row is a row created by the current transaction (returned
572             by Transaction.insert()).
573
574           - If the column has already been modified within the current
575             transaction.
576
577         Because of the latter property, always call Row.verify() *before*
578         modifying the column, for a given read-modify-write.
579
580         A transaction must be in progress."""
581         assert self._idl.txn
582         assert self._changes is not None
583         if not self._data or column_name in self._changes:
584             return
585
586         self._prereqs[column_name] = None
587
588     def delete(self):
589         """Deletes this row from its table.
590
591         A transaction must be in progress."""
592         assert self._idl.txn
593         assert self._changes is not None
594         if self._data is None:
595             del self._idl.txn._txn_rows[self.uuid]
596         self.__dict__["_changes"] = None
597         del self._table.rows[self.uuid]
598
599
600 def _uuid_name_from_uuid(uuid):
601     return "row%s" % str(uuid).replace("-", "_")
602
603
604 def _where_uuid_equals(uuid):
605     return [["_uuid", "==", ["uuid", str(uuid)]]]
606
607
608 class _InsertedRow(object):
609     def __init__(self, op_index):
610         self.op_index = op_index
611         self.real = None
612
613
614 class Transaction(object):
615     # Status values that Transaction.commit() can return.
616     UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
617     UNCHANGED = "unchanged"      # Transaction didn't include any changes.
618     INCOMPLETE = "incomplete"    # Commit in progress, please wait.
619     ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
620     SUCCESS = "success"          # Commit successful.
621     TRY_AGAIN = "try again"      # Commit failed because a "verify" operation
622                                  # reported an inconsistency, due to a network
623                                  # problem, or other transient failure.
624     NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
625     ERROR = "error"              # Commit failed due to a hard error.
626
627     @staticmethod
628     def status_to_string(status):
629         """Converts one of the status values that Transaction.commit() can
630         return into a human-readable string.
631
632         (The status values are in fact such strings already, so
633         there's nothing to do.)"""
634         return status
635
636     def __init__(self, idl):
637         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
638         A given Idl may only have a single active transaction at a time.
639
640         A Transaction may modify the contents of a database by assigning new
641         values to columns (attributes of Row), deleting rows (with
642         Row.delete()), or inserting rows (with Transaction.insert()).  It may
643         also check that columns in the database have not changed with
644         Row.verify().
645
646         When a transaction is complete (which must be before the next call to
647         Idl.run()), call Transaction.commit() or Transaction.abort()."""
648         assert idl.txn is None
649
650         idl.txn = self
651         self._request_id = None
652         self.idl = idl
653         self.dry_run = False
654         self._txn_rows = {}
655         self._status = Transaction.UNCOMMITTED
656         self._error = None
657         self._comments = []
658
659         self._inc_table = None
660         self._inc_column = None
661         self._inc_where = None
662
663         self._inserted_rows = {}  # Map from UUID to _InsertedRow
664
665     def add_comment(self, comment):
666         """Appens 'comment' to the comments that will be passed to the OVSDB
667         server when this transaction is committed.  (The comment will be
668         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
669         relatively human-readable form.)"""
670         self._comments.append(comment)
671
672     def increment(self, table, column, where):
673         assert not self._inc_table
674         self._inc_table = table
675         self._inc_column = column
676         self._inc_where = where
677
678     def wait(self, poller):
679         if self._status not in (Transaction.UNCOMMITTED,
680                                 Transaction.INCOMPLETE):
681             poller.immediate_wake()
682
683     def _substitute_uuids(self, json):
684         if type(json) in (list, tuple):
685             if (len(json) == 2
686                 and json[0] == 'uuid'
687                 and ovs.ovsuuid.is_valid_string(json[1])):
688                 uuid = ovs.ovsuuid.from_string(json[1])
689                 row = self._txn_rows.get(uuid, None)
690                 if row and row._data is None:
691                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
692         return json
693
694     def __disassemble(self):
695         self.idl.txn = None
696
697         for row in self._txn_rows.itervalues():
698             if row._changes is None:
699                 row._table.rows[row.uuid] = row
700             elif row._data is None:
701                 del row._table.rows[row.uuid]
702             row.__dict__["_changes"] = {}
703             row.__dict__["_prereqs"] = {}
704         self._txn_rows = {}
705
706     def commit(self):
707         """Attempts to commit this transaction and returns the status of the
708         commit operation, one of the constants declared as class attributes.
709         If the return value is Transaction.INCOMPLETE, then the transaction is
710         not yet complete and the caller should try calling again later, after
711         calling Idl.run() to run the Idl.
712
713         Committing a transaction rolls back all of the changes that it made to
714         the Idl's copy of the database.  If the transaction commits
715         successfully, then the database server will send an update and, thus,
716         the Idl will be updated with the committed changes."""
717         # The status can only change if we're the active transaction.
718         # (Otherwise, our status will change only in Idl.run().)
719         if self != self.idl.txn:
720             return self._status
721
722         # If we need a lock but don't have it, give up quickly.
723         if self.idl.lock_name and not self.idl.has_lock():
724             self._status = Transaction.NOT_LOCKED
725             self.__disassemble()
726             return self._status
727
728         operations = [self.idl._db.name]
729
730         # Assert that we have the required lock (avoiding a race).
731         if self.idl.lock_name:
732             operations.append({"op": "assert",
733                                "lock": self.idl.lock_name})
734
735         # Add prerequisites and declarations of new rows.
736         for row in self._txn_rows.itervalues():
737             if row._prereqs:
738                 rows = {}
739                 columns = []
740                 for column_name in row._prereqs:
741                     columns.append(column_name)
742                     rows[column_name] = row._data[column_name].to_json()
743                 operations.append({"op": "wait",
744                                    "table": row._table.name,
745                                    "timeout": 0,
746                                    "where": _where_uuid_equals(row.uuid),
747                                    "until": "==",
748                                    "columns": columns,
749                                    "rows": [rows]})
750
751         # Add updates.
752         any_updates = False
753         for row in self._txn_rows.itervalues():
754             if row._changes is None:
755                 if row._table.is_root:
756                     operations.append({"op": "delete",
757                                        "table": row._table.name,
758                                        "where": _where_uuid_equals(row.uuid)})
759                     any_updates = True
760                 else:
761                     # Let ovsdb-server decide whether to really delete it.
762                     pass
763             elif row._changes:
764                 op = {"table": row._table.name}
765                 if row._data is None:
766                     op["op"] = "insert"
767                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
768                     any_updates = True
769
770                     op_index = len(operations) - 1
771                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
772                 else:
773                     op["op"] = "update"
774                     op["where"] = _where_uuid_equals(row.uuid)
775
776                 row_json = {}
777                 op["row"] = row_json
778
779                 for column_name, datum in row._changes.iteritems():
780                     if row._data is not None or not datum.is_default():
781                         row_json[column_name] = (
782                                 self._substitute_uuids(datum.to_json()))
783
784                         # If anything really changed, consider it an update.
785                         # We can't suppress not-really-changed values earlier
786                         # or transactions would become nonatomic (see the big
787                         # comment inside Transaction._write()).
788                         if (not any_updates and row._data is not None and
789                             row._data[column_name] != datum):
790                             any_updates = True
791
792                 if row._data is None or row_json:
793                     operations.append(op)
794
795         # Add increment.
796         if self._inc_table and any_updates:
797             self._inc_index = len(operations) - 1
798
799             operations.append({"op": "mutate",
800                                "table": self._inc_table,
801                                "where": self._substitute_uuids(
802                                    self._inc_where),
803                                "mutations": [[self._inc_column, "+=", 1]]})
804             operations.append({"op": "select",
805                                "table": self._inc_table,
806                                "where": self._substitute_uuids(
807                                    self._inc_where),
808                                "columns": [self._inc_column]})
809
810         # Add comment.
811         if self._comments:
812             operations.append({"op": "comment",
813                                "comment": "\n".join(self._comments)})
814
815         # Dry run?
816         if self.dry_run:
817             operations.append({"op": "abort"})
818
819         if not any_updates:
820             self._status = Transaction.UNCHANGED
821         else:
822             msg = ovs.jsonrpc.Message.create_request("transact", operations)
823             self._request_id = msg.id
824             if not self.idl._session.send(msg):
825                 self.idl._outstanding_txns[self._request_id] = self
826                 self._status = Transaction.INCOMPLETE
827             else:
828                 self._status = Transaction.TRY_AGAIN
829
830         self.__disassemble()
831         return self._status
832
833     def commit_block(self):
834         while True:
835             status = self.commit()
836             if status != Transaction.INCOMPLETE:
837                 return status
838
839             self.idl.run()
840
841             poller = ovs.poller.Poller()
842             self.idl.wait(poller)
843             self.wait(poller)
844             poller.block()
845
846     def get_increment_new_value(self):
847         assert self._status == Transaction.SUCCESS
848         return self._inc_new_value
849
850     def abort(self):
851         """Aborts this transaction.  If Transaction.commit() has already been
852         called then the transaction might get committed anyhow."""
853         self.__disassemble()
854         if self._status in (Transaction.UNCOMMITTED,
855                             Transaction.INCOMPLETE):
856             self._status = Transaction.ABORTED
857
858     def get_error(self):
859         """Returns a string representing this transaction's current status,
860         suitable for use in log messages."""
861         if self._status != Transaction.ERROR:
862             return Transaction.status_to_string(self._status)
863         elif self._error:
864             return self._error
865         else:
866             return "no error details available"
867
868     def __set_error_json(self, json):
869         if self._error is None:
870             self._error = ovs.json.to_string(json)
871
872     def get_insert_uuid(self, uuid):
873         """Finds and returns the permanent UUID that the database assigned to a
874         newly inserted row, given the UUID that Transaction.insert() assigned
875         locally to that row.
876
877         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
878         or if it was assigned by that function and then deleted by Row.delete()
879         within the same transaction.  (Rows that are inserted and then deleted
880         within a single transaction are never sent to the database server, so
881         it never assigns them a permanent UUID.)
882
883         This transaction must have completed successfully."""
884         assert self._status in (Transaction.SUCCESS,
885                                 Transaction.UNCHANGED)
886         inserted_row = self._inserted_rows.get(uuid)
887         if inserted_row:
888             return inserted_row.real
889         return None
890
891     def _write(self, row, column, datum):
892         assert row._changes is not None
893
894         txn = row._idl.txn
895
896         # If this is a write-only column and the datum being written is the
897         # same as the one already there, just skip the update entirely.  This
898         # is worth optimizing because we have a lot of columns that get
899         # periodically refreshed into the database but don't actually change
900         # that often.
901         #
902         # We don't do this for read/write columns because that would break
903         # atomicity of transactions--some other client might have written a
904         # different value in that column since we read it.  (But if a whole
905         # transaction only does writes of existing values, without making any
906         # real changes, we will drop the whole transaction later in
907         # ovsdb_idl_txn_commit().)
908         if not column.alert and row._data.get(column.name) == datum:
909             new_value = row._changes.get(column.name)
910             if new_value is None or new_value == datum:
911                 return
912
913         txn._txn_rows[row.uuid] = row
914         row._changes[column.name] = datum.copy()
915
916     def insert(self, table, new_uuid=None):
917         """Inserts and returns a new row in 'table', which must be one of the
918         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
919
920         The new row is assigned a provisional UUID.  If 'uuid' is None then one
921         is randomly generated; otherwise 'uuid' should specify a randomly
922         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
923         different UUID when 'txn' is committed, but the IDL will replace any
924         uses of the provisional UUID in the data to be to be committed by the
925         UUID assigned by ovsdb-server."""
926         assert self._status == Transaction.UNCOMMITTED
927         if new_uuid is None:
928             new_uuid = uuid.uuid4()
929         row = Row(self.idl, table, new_uuid, None)
930         table.rows[row.uuid] = row
931         self._txn_rows[row.uuid] = row
932         return row
933
934     def _process_reply(self, msg):
935         if msg.type == ovs.jsonrpc.Message.T_ERROR:
936             self._status = Transaction.ERROR
937         elif type(msg.result) not in (list, tuple):
938             # XXX rate-limit
939             logging.warning('reply to "transact" is not JSON array')
940         else:
941             hard_errors = False
942             soft_errors = False
943             lock_errors = False
944
945             ops = msg.result
946             for op in ops:
947                 if op is None:
948                     # This isn't an error in itself but indicates that some
949                     # prior operation failed, so make sure that we know about
950                     # it.
951                     soft_errors = True
952                 elif type(op) == dict:
953                     error = op.get("error")
954                     if error is not None:
955                         if error == "timed out":
956                             soft_errors = True
957                         elif error == "not owner":
958                             lock_errors = True
959                         elif error == "aborted":
960                             pass
961                         else:
962                             hard_errors = True
963                             self.__set_error_json(op)
964                 else:
965                     hard_errors = True
966                     self.__set_error_json(op)
967                     # XXX rate-limit
968                     logging.warning("operation reply is not JSON null or "
969                                     "object")
970
971             if not soft_errors and not hard_errors and not lock_errors:
972                 if self._inc_table and not self.__process_inc_reply(ops):
973                     hard_errors = True
974
975                 for insert in self._inserted_rows.itervalues():
976                     if not self.__process_insert_reply(insert, ops):
977                         hard_errors = True
978
979             if hard_errors:
980                 self._status = Transaction.ERROR
981             elif lock_errors:
982                 self._status = Transaction.NOT_LOCKED
983             elif soft_errors:
984                 self._status = Transaction.TRY_AGAIN
985             else:
986                 self._status = Transaction.SUCCESS
987
988     @staticmethod
989     def __check_json_type(json, types, name):
990         if not json:
991             # XXX rate-limit
992             logging.warning("%s is missing" % name)
993             return False
994         elif type(json) not in types:
995             # XXX rate-limit
996             logging.warning("%s has unexpected type %s" % (name, type(json)))
997             return False
998         else:
999             return True
1000
1001     def __process_inc_reply(self, ops):
1002         if self._inc_index + 2 > len(ops):
1003             # XXX rate-limit
1004             logging.warning("reply does not contain enough operations for "
1005                             "increment (has %d, needs %d)" %
1006                             (len(ops), self._inc_index + 2))
1007
1008         # We know that this is a JSON object because the loop in
1009         # __process_reply() already checked.
1010         mutate = ops[self._inc_index]
1011         count = mutate.get("count")
1012         if not Transaction.__check_json_type(count, (int, long),
1013                                              '"mutate" reply "count"'):
1014             return False
1015         if count != 1:
1016             # XXX rate-limit
1017             logging.warning('"mutate" reply "count" is %d instead of 1'
1018                             % count)
1019             return False
1020
1021         select = ops[self._inc_index + 1]
1022         rows = select.get("rows")
1023         if not Transaction.__check_json_type(rows, (list, tuple),
1024                                              '"select" reply "rows"'):
1025             return False
1026         if len(rows) != 1:
1027             # XXX rate-limit
1028             logging.warning('"select" reply "rows" has %d elements '
1029                             'instead of 1' % len(rows))
1030             return False
1031         row = rows[0]
1032         if not Transaction.__check_json_type(row, (dict,),
1033                                              '"select" reply row'):
1034             return False
1035         column = row.get(self._inc_column)
1036         if not Transaction.__check_json_type(column, (int, long),
1037                                              '"select" reply inc column'):
1038             return False
1039         self._inc_new_value = column
1040         return True
1041
1042     def __process_insert_reply(self, insert, ops):
1043         if insert.op_index >= len(ops):
1044             # XXX rate-limit
1045             logging.warning("reply does not contain enough operations "
1046                             "for insert (has %d, needs %d)"
1047                             % (len(ops), insert.op_index))
1048             return False
1049
1050         # We know that this is a JSON object because the loop in
1051         # __process_reply() already checked.
1052         reply = ops[insert.op_index]
1053         json_uuid = reply.get("uuid")
1054         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1055                                              '"insert" reply "uuid"'):
1056             return False
1057
1058         try:
1059             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1060         except error.Error:
1061             # XXX rate-limit
1062             logging.warning('"insert" reply "uuid" is not a JSON UUID')
1063             return False
1064
1065         insert.real = uuid_
1066         return True