python: Implement write support in Python IDL for OVSDB.
[sliver-openvswitch.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import uuid
17
18 import ovs.jsonrpc
19 import ovs.db.parser
20 import ovs.db.schema
21 from ovs.db import error
22 import ovs.ovsuuid
23 import ovs.poller
24
25 class Idl:
26     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
27
28     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
29     requests to an OVSDB database server and parses the responses, converting
30     raw JSON into data structures that are easier for clients to digest.
31
32     The IDL also assists with issuing database transactions.  The client
33     creates a transaction, manipulates the IDL data structures, and commits or
34     aborts the transaction.  The IDL then composes and issues the necessary
35     JSON-RPC requests and reports to the client whether the transaction
36     completed successfully.
37
38     The client is allowed to access the following attributes directly, in a
39     read-only fashion:
40
41     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
42       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
43       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
44       to a Row object.
45
46       The client may directly read and write the Row objects referenced by the
47       'rows' map values.  Refer to Row for more details.
48
49     - 'change_seqno': A number that represents the IDL's state.  When the IDL
50       is updated (by Idl.run()), its value changes.
51
52     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
53       if no lock is configured.
54
55     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
56       lock, and False otherwise.
57
58       Locking and unlocking happens asynchronously from the database client's
59       point of view, so the information is only useful for optimization
60       (e.g. if the client doesn't have the lock then there's no point in trying
61       to write to the database).
62
63     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
64       the database server has indicated that some other client already owns the
65       requested lock, and False otherwise.
66
67     - 'txn': The ovs.db.idl.Transaction object for the database transaction
68       currently being constructed, if there is one, or None otherwise.
69 """
70
71     def __init__(self, remote, schema):
72         """Creates and returns a connection to the database named 'db_name' on
73         'remote', which should be in a form acceptable to
74         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
75         replica of the remote database.
76
77         'schema' should be the schema for the remote database.  The caller may
78         have cut it down by removing tables or columns that are not of
79         interest.  The IDL will only replicate the tables and columns that
80         remain.  The caller may also add a attribute named 'alert' to selected
81         remaining columns, setting its value to False; if so, then changes to
82         those columns will not be considered changes to the database for the
83         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
84         useful for columns that the IDL's client will write but not read.
85
86         The IDL uses and modifies 'schema' directly."""
87
88         self.tables = schema.tables
89         self._db = schema
90         self._session = ovs.jsonrpc.Session.open(remote)
91         self._monitor_request_id = None
92         self._last_seqno = None
93         self.change_seqno = 0
94
95         # Database locking.
96         self.lock_name = None          # Name of lock we need, None if none.
97         self.has_lock = False          # Has db server said we have the lock?
98         self.is_lock_contended = False # Has db server said we can't get lock?
99         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
100
101         # Transaction support.
102         self.txn = None
103         self._outstanding_txns = {}
104
105         for table in schema.tables.itervalues():
106             for column in table.columns.itervalues():
107                 if not hasattr(column, 'alert'):
108                     column.alert = True
109             table.need_table = False
110             table.rows = {}
111             table.idl = self
112
113     def close(self):
114         """Closes the connection to the database.  The IDL will no longer
115         update."""
116         self._session.close()
117
118     def run(self):
119         """Processes a batch of messages from the database server.  Returns
120         True if the database as seen through the IDL changed, False if it did
121         not change.  The initial fetch of the entire contents of the remote
122         database is considered to be one kind of change.  If the IDL has been
123         configured to acquire a database lock (with Idl.set_lock()), then
124         successfully acquiring the lock is also considered to be a change.
125
126         This function can return occasional false positives, that is, report
127         that the database changed even though it didn't.  This happens if the
128         connection to the database drops and reconnects, which causes the
129         database contents to be reloaded even if they didn't change.  (It could
130         also happen if the database server sends out a "change" that reflects
131         what we already thought was in the database, but the database server is
132         not supposed to do that.)
133
134         As an alternative to checking the return value, the client may check
135         for changes in self.change_seqno."""
136         assert not self.txn
137         initial_change_seqno = self.change_seqno
138         self._session.run()
139         i = 0
140         while i < 50:
141             i += 1
142             if not self._session.is_connected():
143                 break
144
145             seqno = self._session.get_seqno()
146             if seqno != self._last_seqno:
147                 self._last_seqno = seqno
148                 self.__txn_abort_all()
149                 self.__send_monitor_request()
150                 if self.lock_name:
151                     self.__send_lock_request()
152                 break
153
154             msg = self._session.recv()
155             if msg is None:
156                 break
157             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
158                 and msg.method == "update"
159                 and len(msg.params) == 2
160                 and msg.params[0] == None):
161                 # Database contents changed.
162                 self.__parse_update(msg.params[1])
163             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
164                   and self._monitor_request_id is not None
165                   and self._monitor_request_id == msg.id):
166                 # Reply to our "monitor" request.
167                 try:
168                     self.change_seqno += 1
169                     self._monitor_request_id = None
170                     self.__clear()
171                     self.__parse_update(msg.result)
172                 except error.Error, e:
173                     logging.error("%s: parse error in received schema: %s"
174                                   % (self._session.get_name(), e))
175                     self.__error()
176             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
177                   and self._lock_request_id is not None
178                   and self._lock_request_id == msg.id):
179                 # Reply to our "lock" request.
180                 self.__parse_lock_reply(msg.result)
181             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
182                   and msg.method == "locked"):
183                 # We got our lock.
184                 self.__parse_lock_notify(msg.params, True)
185             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
186                   and msg.method == "stolen"):
187                 # Someone else stole our lock.
188                 self.__parse_lock_notify(msg.params, False)
189             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
190                 # Reply to our echo request.  Ignore it.
191                 pass
192             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
193                                ovs.jsonrpc.Message.T_REPLY)
194                   and self.__txn_process_reply(msg)):
195                 # __txn_process_reply() did everything needed.
196                 pass
197             else:
198                 # This can happen if a transaction is destroyed before we
199                 # receive the reply, so keep the log level low.
200                 logging.debug("%s: received unexpected %s message"
201                               % (self._session.get_name(),
202                                  ovs.jsonrpc.Message.type_to_string(msg.type)))
203
204         return initial_change_seqno != self.change_seqno
205
206     def wait(self, poller):
207         """Arranges for poller.block() to wake up when self.run() has something
208         to do or when activity occurs on a transaction on 'self'."""
209         self._session.wait(poller)
210         self._session.recv_wait(poller)
211
212     def has_ever_connected(self):
213         """Returns True, if the IDL successfully connected to the remote
214         database and retrieved its contents (even if the connection
215         subsequently dropped and is in the process of reconnecting).  If so,
216         then the IDL contains an atomic snapshot of the database's contents
217         (but it might be arbitrarily old if the connection dropped).
218
219         Returns False if the IDL has never connected or retrieved the
220         database's contents.  If so, the IDL is empty."""
221         return self.change_seqno != 0
222
223     def force_reconnect(self):
224         """Forces the IDL to drop its connection to the database and reconnect.
225         In the meantime, the contents of the IDL will not change."""
226         self._session.force_reconnect()
227
228     def set_lock(self, lock_name):
229         """If 'lock_name' is not None, configures the IDL to obtain the named
230         lock from the database server and to avoid modifying the database when
231         the lock cannot be acquired (that is, when another client has the same
232         lock).
233
234         If 'lock_name' is None, drops the locking requirement and releases the
235         lock."""
236         assert not self.txn
237         assert not self._outstanding_txns
238
239         if self.lock_name and (not lock_name or lock_name != self.lock_name):
240             # Release previous lock.
241             self.__send_unlock_request()
242             self.lock_name = None
243             self.is_lock_contended = False
244
245         if lock_name and not self.lock_name:
246             # Acquire new lock.
247             self.lock_name = lock_name
248             self.__send_lock_request()
249
250     def __clear(self):
251         changed = False
252
253         for table in self.tables.itervalues():
254             if table.rows:
255                 changed = True
256                 table.rows = {}
257
258         if changed:
259             self.change_seqno += 1
260
261     def __update_has_lock(self, new_has_lock):
262         if new_has_lock and not self.has_lock:
263             if self._monitor_request_id is None:
264                 self.change_seqno += 1
265             else:
266                 # We're waiting for a monitor reply, so don't signal that the
267                 # database changed.  The monitor reply will increment
268                 # change_seqno anyhow.
269                 pass
270             self.is_lock_contended = False
271         self.has_lock = new_has_lock
272
273     def __do_send_lock_request(self, method):
274         self.__update_has_lock(False)
275         self._lock_request_id = None
276         if self._session.is_connected():
277             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
278             msg_id = msg.id
279             self._session.send(msg)
280         else:
281             msg_id = None
282         return msg_id
283
284     def __send_lock_request(self):
285         self._lock_request_id = self.__do_send_lock_request("lock")
286
287     def __send_unlock_request(self):
288         self.__do_send_lock_request("unlock")
289
290     def __parse_lock_reply(self, result):
291         self._lock_request_id = None
292         got_lock = type(result) == dict and result.get("locked") is True
293         self.__update_has_lock(got_lock)
294         if not got_lock:
295             self.is_lock_contended = True
296
297     def __parse_lock_notify(self, params, new_has_lock):
298         if (self.lock_name is not None
299             and type(params) in (list, tuple)
300             and params
301             and params[0] == self.lock_name):
302             self.__update_has_lock(self, new_has_lock)
303             if not new_has_lock:
304                 self.is_lock_contended = True
305
306     def __send_monitor_request(self):
307         monitor_requests = {}
308         for table in self.tables.itervalues():
309             monitor_requests[table.name] = {"columns": table.columns.keys()}
310         msg = ovs.jsonrpc.Message.create_request(
311             "monitor", [self._db.name, None, monitor_requests])
312         self._monitor_request_id = msg.id
313         self._session.send(msg)
314
315     def __parse_update(self, update):
316         try:
317             self.__do_parse_update(update)
318         except error.Error, e:
319             logging.error("%s: error parsing update: %s"
320                           % (self._session.get_name(), e))
321
322     def __do_parse_update(self, table_updates):
323         if type(table_updates) != dict:
324             raise error.Error("<table-updates> is not an object",
325                               table_updates)
326
327         for table_name, table_update in table_updates.iteritems():
328             table = self.tables.get(table_name)
329             if not table:
330                 raise error.Error('<table-updates> includes unknown '
331                                   'table "%s"' % table_name)
332
333             if type(table_update) != dict:
334                 raise error.Error('<table-update> for table "%s" is not '
335                                   'an object' % table_name, table_update)
336
337             for uuid_string, row_update in table_update.iteritems():
338                 if not ovs.ovsuuid.is_valid_string(uuid_string):
339                     raise error.Error('<table-update> for table "%s" '
340                                       'contains bad UUID "%s" as member '
341                                       'name' % (table_name, uuid_string),
342                                       table_update)
343                 uuid = ovs.ovsuuid.from_string(uuid_string)
344
345                 if type(row_update) != dict:
346                     raise error.Error('<table-update> for table "%s" '
347                                       'contains <row-update> for %s that '
348                                       'is not an object'
349                                       % (table_name, uuid_string))
350
351                 parser = ovs.db.parser.Parser(row_update, "row-update")
352                 old = parser.get_optional("old", [dict])
353                 new = parser.get_optional("new", [dict])
354                 parser.finish()
355
356                 if not old and not new:
357                     raise error.Error('<row-update> missing "old" and '
358                                       '"new" members', row_update)
359
360                 if self.__process_update(table, uuid, old, new):
361                     self.change_seqno += 1
362
363     def __process_update(self, table, uuid, old, new):
364         """Returns True if a column changed, False otherwise."""
365         row = table.rows.get(uuid)
366         changed = False
367         if not new:
368             # Delete row.
369             if row:
370                 del table.rows[uuid]
371                 changed = True
372             else:
373                 # XXX rate-limit
374                 logging.warning("cannot delete missing row %s from table %s"
375                                 % (uuid, table.name))
376         elif not old:
377             # Insert row.
378             if not row:
379                 row = self.__create_row(table, uuid)
380                 changed = True
381             else:
382                 # XXX rate-limit
383                 logging.warning("cannot add existing row %s to table %s"
384                                 % (uuid, table.name))
385             if self.__row_update(table, row, new):
386                 changed = True
387         else:
388             if not row:
389                 row = self.__create_row(table, uuid)
390                 changed = True
391                 # XXX rate-limit
392                 logging.warning("cannot modify missing row %s in table %s"
393                                 % (uuid, table.name))
394             if self.__row_update(table, row, new):
395                 changed = True
396         return changed
397
398     def __row_update(self, table, row, row_json):
399         changed = False
400         for column_name, datum_json in row_json.iteritems():
401             column = table.columns.get(column_name)
402             if not column:
403                 # XXX rate-limit
404                 logging.warning("unknown column %s updating table %s"
405                                 % (column_name, table.name))
406                 continue
407
408             try:
409                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
410             except error.Error, e:
411                 # XXX rate-limit
412                 logging.warning("error parsing column %s in table %s: %s"
413                                 % (column_name, table.name, e))
414                 continue
415
416             if datum != row._data[column_name]:
417                 row._data[column_name] = datum
418                 if column.alert:
419                     changed = True
420             else:
421                 # Didn't really change but the OVSDB monitor protocol always
422                 # includes every value in a row.
423                 pass
424         return changed
425
426     def __create_row(self, table, uuid):
427         data = {}
428         for column in table.columns.itervalues():
429             data[column.name] = ovs.db.data.Datum.default(column.type)
430         row = table.rows[uuid] = Row(self, table, uuid, data)
431         return row
432
433     def __error(self):
434         self._session.force_reconnect()
435
436     def __txn_abort_all(self):
437         while self._outstanding_txns:
438             txn = self._outstanding_txns.popitem()[1]
439             txn._status = Transaction.TRY_AGAIN
440
441     def __txn_process_reply(self, msg):
442         txn = self._outstanding_txns.pop(msg.id, None)
443         if txn:
444             txn._process_reply(msg)
445
446 def _uuid_to_row(atom, base):
447     if base.ref_table:
448         return base.ref_table.rows.get(atom)
449     else:
450         return atom
451
452 def _row_to_uuid(value):
453     if type(value) == Row:
454         return value.uuid
455     else:
456         return value
457
458 class Row(object):
459     """A row within an IDL.
460
461     The client may access the following attributes directly:
462
463     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
464
465     - An attribute for each column in the Row's table, named for the column,
466       whose values are as returned by Datum.to_python() for the column's type.
467
468       If some error occurs (e.g. the database server's idea of the column is
469       different from the IDL's idea), then the attribute values is the
470       "default" value return by Datum.default() for the column's type.  (It is
471       important to know this because the default value may violate constraints
472       for the column's type, e.g. the default integer value is 0 even if column
473       contraints require the column's value to be positive.)
474
475       When a transaction is active, column attributes may also be assigned new
476       values.  Committing the transaction will then cause the new value to be
477       stored into the database.
478
479       *NOTE*: In the current implementation, the value of a column is a *copy*
480       of the value in the database.  This means that modifying its value
481       directly will have no useful effect.  For example, the following:
482         row.mycolumn["a"] = "b"              # don't do this
483       will not change anything in the database, even after commit.  To modify
484       the column, instead assign the modified column value back to the column:
485         d = row.mycolumn
486         d["a"] = "b"
487         row.mycolumn = d
488 """
489     def __init__(self, idl, table, uuid, data):
490         # All of the explicit references to self.__dict__ below are required
491         # to set real attributes with invoking self.__getattr__().
492         self.__dict__["uuid"] = uuid
493
494         self.__dict__["_idl"] = idl
495         self.__dict__["_table"] = table
496
497         # _data is the committed data.  It takes the following values:
498         #
499         #   - A dictionary that maps every column name to a Datum, if the row
500         #     exists in the committed form of the database.
501         #
502         #   - None, if this row is newly inserted within the active transaction
503         #     and thus has no committed form.
504         self.__dict__["_data"] = data
505
506         # _changes describes changes to this row within the active transaction.
507         # It takes the following values:
508         #
509         #   - {}, the empty dictionary, if no transaction is active or if the
510         #     row has yet not been changed within this transaction.
511         #
512         #   - A dictionary that maps a column name to its new Datum, if an
513         #     active transaction changes those columns' values.
514         #
515         #   - A dictionary that maps every column name to a Datum, if the row
516         #     is newly inserted within the active transaction.
517         #
518         #   - None, if this transaction deletes this row.
519         self.__dict__["_changes"] = {}
520
521         # A dictionary whose keys are the names of columns that must be
522         # verified as prerequisites when the transaction commits.  The values
523         # in the dictionary are all None.
524         self.__dict__["_prereqs"] = {}
525
526     def __getattr__(self, column_name):
527         assert self._changes is not None
528
529         datum = self._changes.get(column_name)
530         if datum is None:
531             datum = self._data[column_name]
532
533         return datum.to_python(_uuid_to_row)
534
535     def __setattr__(self, column_name, value):
536         assert self._changes is not None
537         assert self._idl.txn
538
539         column = self._table.columns[column_name]
540         try:
541             datum = ovs.db.data.Datum.from_python(column.type, value,
542                                                   _row_to_uuid)
543         except error.Error, e:
544             # XXX rate-limit
545             logging.error("attempting to write bad value to column %s (%s)"
546                           % (column_name, e))
547             return
548         self._idl.txn._write(self, column, datum)
549
550     def verify(self, column_name):
551         """Causes the original contents of column 'column_name' in this row to
552         be verified as a prerequisite to completing the transaction.  That is,
553         if 'column_name' changed in this row (or if this row was deleted)
554         between the time that the IDL originally read its contents and the time
555         that the transaction commits, then the transaction aborts and
556         Transaction.commit() returns Transaction.TRY_AGAIN.
557
558         The intention is that, to ensure that no transaction commits based on
559         dirty reads, an application should call Row.verify() on each data item
560         read as part of a read-modify-write operation.
561
562         In some cases Row.verify() reduces to a no-op, because the current
563         value of the column is already known:
564
565           - If this row is a row created by the current transaction (returned
566             by Transaction.insert()).
567
568           - If the column has already been modified within the current
569             transaction.
570
571         Because of the latter property, always call Row.verify() *before*
572         modifying the column, for a given read-modify-write.
573
574         A transaction must be in progress."""
575         assert self._idl.txn
576         assert self._changes is not None
577         if not self._data or column_name in self._changes:
578             return
579
580         self._prereqs[column_name] = None
581
582     def delete(self):
583         """Deletes this row from its table.
584
585         A transaction must be in progress."""
586         assert self._idl.txn
587         assert self._changes is not None
588         if self._data is None:
589             del self._idl.txn._txn_rows[self.uuid]
590         self.__dict__["_changes"] = None
591         del self._table.rows[self.uuid]
592
593 def _uuid_name_from_uuid(uuid):
594     return "row%s" % str(uuid).replace("-", "_")
595
596 def _where_uuid_equals(uuid):
597     return [["_uuid", "==", ["uuid", str(uuid)]]]
598
599 class _InsertedRow(object):
600     def __init__(self, op_index):
601         self.op_index = op_index
602         self.real = None
603
604 class Transaction(object):
605     # Status values that Transaction.commit() can return.
606     UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
607     UNCHANGED = "unchanged"     # Transaction didn't include any changes.
608     INCOMPLETE = "incomplete"   # Commit in progress, please wait.
609     ABORTED = "aborted"         # ovsdb_idl_txn_abort() called.
610     SUCCESS = "success"         # Commit successful.
611     TRY_AGAIN = "try again"     # Commit failed because a "verify" operation
612                                 # reported an inconsistency, due to a network
613                                 # problem, or other transient failure.
614     NOT_LOCKED = "not locked"   # Server hasn't given us the lock yet.
615     ERROR = "error"             # Commit failed due to a hard error.
616
617     @staticmethod
618     def status_to_string(status):
619         """Converts one of the status values that Transaction.commit() can
620         return into a human-readable string.
621
622         (The status values are in fact such strings already, so
623         there's nothing to do.)"""
624         return status
625
626     def __init__(self, idl):
627         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
628         A given Idl may only have a single active transaction at a time.
629
630         A Transaction may modify the contents of a database by assigning new
631         values to columns (attributes of Row), deleting rows (with
632         Row.delete()), or inserting rows (with Transaction.insert()).  It may
633         also check that columns in the database have not changed with
634         Row.verify().
635
636         When a transaction is complete (which must be before the next call to
637         Idl.run()), call Transaction.commit() or Transaction.abort()."""
638         assert idl.txn is None
639
640         idl.txn = self
641         self._request_id = None
642         self.idl = idl
643         self.dry_run = False
644         self._txn_rows = {}
645         self._status = Transaction.UNCOMMITTED
646         self._error = None
647         self._comments = []
648
649         self._inc_table = None
650         self._inc_column = None
651         self._inc_where = None
652
653         self._inserted_rows = {} # Map from UUID to _InsertedRow
654
655     def add_comment(self, comment):
656         """Appens 'comment' to the comments that will be passed to the OVSDB
657         server when this transaction is committed.  (The comment will be
658         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
659         relatively human-readable form.)"""
660         self._comments.append(comment)
661
662     def increment(self, table, column, where):
663         assert not self._inc_table
664         self._inc_table = table
665         self._inc_column = column
666         self._inc_where = where
667
668     def wait(self, poller):
669         if self._status not in (Transaction.UNCOMMITTED,
670                                 Transaction.INCOMPLETE):
671             poller.immediate_wake()
672
673     def _substitute_uuids(self, json):
674         if type(json) in (list, tuple):
675             if (len(json) == 2
676                 and json[0] == 'uuid'
677                 and ovs.ovsuuid.is_valid_string(json[1])):
678                 uuid = ovs.ovsuuid.from_string(json[1])
679                 row = self._txn_rows.get(uuid, None)
680                 if row and row._data is None:
681                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
682         return json
683
684     def __disassemble(self):
685         self.idl.txn = None
686
687         for row in self._txn_rows.itervalues():
688             if row._changes is None:
689                 row._table.rows[row.uuid] = row
690             elif row._data is None:
691                 del row._table.rows[row.uuid]
692             row.__dict__["_changes"] = {}
693             row.__dict__["_prereqs"] = {}
694         self._txn_rows = {}
695
696     def commit(self):
697         """Attempts to commit this transaction and returns the status of the
698         commit operation, one of the constants declared as class attributes.
699         If the return value is Transaction.INCOMPLETE, then the transaction is
700         not yet complete and the caller should try calling again later, after
701         calling Idl.run() to run the Idl.
702
703         Committing a transaction rolls back all of the changes that it made to
704         the Idl's copy of the database.  If the transaction commits
705         successfully, then the database server will send an update and, thus,
706         the Idl will be updated with the committed changes."""
707         # The status can only change if we're the active transaction.
708         # (Otherwise, our status will change only in Idl.run().)
709         if self != self.idl.txn:
710             return self._status
711
712         # If we need a lock but don't have it, give up quickly.
713         if self.idl.lock_name and not self.idl.has_lock():
714             self._status = Transaction.NOT_LOCKED
715             self.__disassemble()
716             return self._status
717
718         operations = [self.idl._db.name]
719
720         # Assert that we have the required lock (avoiding a race).
721         if self.idl.lock_name:
722             operations.append({"op": "assert",
723                                "lock": self.idl.lock_name})
724
725         # Add prerequisites and declarations of new rows.
726         for row in self._txn_rows.itervalues():
727             if row._prereqs:
728                 rows = {}
729                 columns = []
730                 for column_name in row._prereqs:
731                     columns.append(column_name)
732                     rows[column_name] = row._data[column_name].to_json()
733                 operations.append({"op": "wait",
734                                    "table": row._table.name,
735                                    "timeout": 0,
736                                    "where": _where_uuid_equals(row.uuid),
737                                    "until": "==",
738                                    "columns": columns,
739                                    "rows": [rows]})
740
741         # Add updates.
742         any_updates = False
743         for row in self._txn_rows.itervalues():
744             if row._changes is None:
745                 if row._table.is_root:
746                     operations.append({"op": "delete",
747                                        "table": row._table.name,
748                                        "where": _where_uuid_equals(row.uuid)})
749                     any_updates = True
750                 else:
751                     # Let ovsdb-server decide whether to really delete it.
752                     pass
753             elif row._changes:
754                 op = {"table": row._table.name}
755                 if row._data is None:
756                     op["op"] = "insert"
757                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
758                     any_updates = True
759
760                     op_index = len(operations) - 1
761                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
762                 else:
763                     op["op"] = "update"
764                     op["where"] = _where_uuid_equals(row.uuid)
765
766                 row_json = {}
767                 op["row"] = row_json
768
769                 for column_name, datum in row._changes.iteritems():
770                     if row._data is not None or not datum.is_default():
771                         row_json[column_name] = self._substitute_uuids(datum.to_json())
772
773                         # If anything really changed, consider it an update.
774                         # We can't suppress not-really-changed values earlier
775                         # or transactions would become nonatomic (see the big
776                         # comment inside Transaction._write()).
777                         if (not any_updates and row._data is not None and
778                             row._data[column_name] != datum):
779                             any_updates = True
780
781                 if row._data is None or row_json:
782                     operations.append(op)
783
784         # Add increment.
785         if self._inc_table and any_updates:
786             self._inc_index = len(operations) - 1
787
788             operations.append({"op": "mutate",
789                                "table": self._inc_table,
790                                "where": self._substitute_uuids(self._inc_where),
791                                "mutations": [[self._inc_column, "+=", 1]]})
792             operations.append({"op": "select",
793                                "table": self._inc_table,
794                                "where": self._substitute_uuids(self._inc_where),
795                                "columns": [self._inc_column]})
796
797         # Add comment.
798         if self._comments:
799             operations.append({"op": "comment",
800                                "comment": "\n".join(self._comments)})
801
802         # Dry run?
803         if self.dry_run:
804             operations.append({"op": "abort"})
805
806         if not any_updates:
807             self._status = Transaction.UNCHANGED
808         else:
809             msg = ovs.jsonrpc.Message.create_request("transact", operations)
810             self._request_id = msg.id
811             if not self.idl._session.send(msg):
812                 self.idl._outstanding_txns[self._request_id] = self
813                 self._status = Transaction.INCOMPLETE
814             else:
815                 self._status = Transaction.TRY_AGAIN
816
817         self.__disassemble()
818         return self._status
819
820     def commit_block(self):
821         while True:
822             status = self.commit()
823             if status != Transaction.INCOMPLETE:
824                 return status
825
826             self.idl.run()
827
828             poller = ovs.poller.Poller()
829             self.idl.wait(poller)
830             self.wait(poller)
831             poller.block()
832
833     def get_increment_new_value(self):
834         assert self._status == Transaction.SUCCESS
835         return self._inc_new_value
836
837     def abort(self):
838         """Aborts this transaction.  If Transaction.commit() has already been
839         called then the transaction might get committed anyhow."""
840         self.__disassemble()
841         if self._status in (Transaction.UNCOMMITTED,
842                             Transaction.INCOMPLETE):
843             self._status = Transaction.ABORTED
844
845     def get_error(self):
846         """Returns a string representing this transaction's current status,
847         suitable for use in log messages."""
848         if self._status != Transaction.ERROR:
849             return Transaction.status_to_string(self._status)
850         elif self._error:
851             return self._error
852         else:
853             return "no error details available"
854
855     def __set_error_json(self, json):
856         if self._error is None:
857             self._error = ovs.json.to_string(json)
858
859     def get_insert_uuid(self, uuid):
860         """Finds and returns the permanent UUID that the database assigned to a
861         newly inserted row, given the UUID that Transaction.insert() assigned
862         locally to that row.
863
864         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
865         or if it was assigned by that function and then deleted by Row.delete()
866         within the same transaction.  (Rows that are inserted and then deleted
867         within a single transaction are never sent to the database server, so
868         it never assigns them a permanent UUID.)
869
870         This transaction must have completed successfully."""
871         assert self._status in (Transaction.SUCCESS,
872                                 Transaction.UNCHANGED)
873         inserted_row = self._inserted_rows.get(uuid)
874         if inserted_row:
875             return inserted_row.real
876         return None
877
878     def _write(self, row, column, datum):
879         assert row._changes is not None
880
881         txn = row._idl.txn
882
883         # If this is a write-only column and the datum being written is the
884         # same as the one already there, just skip the update entirely.  This
885         # is worth optimizing because we have a lot of columns that get
886         # periodically refreshed into the database but don't actually change
887         # that often.
888         #
889         # We don't do this for read/write columns because that would break
890         # atomicity of transactions--some other client might have written a
891         # different value in that column since we read it.  (But if a whole
892         # transaction only does writes of existing values, without making any
893         # real changes, we will drop the whole transaction later in
894         # ovsdb_idl_txn_commit().)
895         if not column.alert and row._data.get(column.name) == datum:
896             new_value = row._changes.get(column.name)
897             if new_value is None or new_value == datum:
898                 return
899
900         txn._txn_rows[row.uuid] = row
901         row._changes[column.name] = datum.copy()
902
903     def insert(self, table, new_uuid=None):
904         """Inserts and returns a new row in 'table', which must be one of the
905         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
906
907         The new row is assigned a provisional UUID.  If 'uuid' is None then one
908         is randomly generated; otherwise 'uuid' should specify a randomly
909         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
910         different UUID when 'txn' is committed, but the IDL will replace any
911         uses of the provisional UUID in the data to be to be committed by the
912         UUID assigned by ovsdb-server."""
913         assert self._status == Transaction.UNCOMMITTED
914         if new_uuid is None:
915             new_uuid = uuid.uuid4()
916         row = Row(self.idl, table, new_uuid, None)
917         table.rows[row.uuid] = row
918         self._txn_rows[row.uuid] = row
919         return row
920
921     def _process_reply(self, msg):
922         if msg.type == ovs.jsonrpc.Message.T_ERROR:
923             self._status = Transaction.ERROR
924         elif type(msg.result) not in (list, tuple):
925             # XXX rate-limit
926             logging.warning('reply to "transact" is not JSON array')
927         else:
928             hard_errors = False
929             soft_errors = False
930             lock_errors = False
931
932             ops = msg.result
933             for op in ops:
934                 if op is None:
935                     # This isn't an error in itself but indicates that some
936                     # prior operation failed, so make sure that we know about
937                     # it.
938                     soft_errors = True
939                 elif type(op) == dict:
940                     error = op.get("error")
941                     if error is not None:
942                         if error == "timed out":
943                             soft_errors = True
944                         elif error == "not owner":
945                             lock_errors = True
946                         elif error == "aborted":
947                             pass
948                         else:
949                             hard_errors = True
950                             self.__set_error_json(op)
951                 else:
952                     hard_errors = True
953                     self.__set_error_json(op)
954                     # XXX rate-limit
955                     logging.warning("operation reply is not JSON null or "
956                                     "object")
957
958             if not soft_errors and not hard_errors and not lock_errors:
959                 if self._inc_table and not self.__process_inc_reply(ops):
960                     hard_errors = True
961
962                 for insert in self._inserted_rows.itervalues():
963                     if not self.__process_insert_reply(insert, ops):
964                         hard_errors = True
965
966             if hard_errors:
967                 self._status = Transaction.ERROR
968             elif lock_errors:
969                 self._status = Transaction.NOT_LOCKED
970             elif soft_errors:
971                 self._status = Transaction.TRY_AGAIN
972             else:
973                 self._status = Transaction.SUCCESS
974
975     @staticmethod
976     def __check_json_type(json, types, name):
977         if not json:
978             # XXX rate-limit
979             logging.warning("%s is missing" % name)
980             return False
981         elif type(json) not in types:
982             # XXX rate-limit
983             logging.warning("%s has unexpected type %s" % (name, type(json)))
984             return False
985         else:
986             return True
987
988     def __process_inc_reply(self, ops):
989         if self._inc_index + 2 > len(ops):
990             # XXX rate-limit
991             logging.warning("reply does not contain enough operations for "
992                             "increment (has %d, needs %d)" %
993                             (len(ops), self._inc_index + 2))
994
995         # We know that this is a JSON object because the loop in
996         # __process_reply() already checked.
997         mutate = ops[self._inc_index]
998         count = mutate.get("count")
999         if not Transaction.__check_json_type(count, (int, long),
1000                                              '"mutate" reply "count"'):
1001             return False
1002         if count != 1:
1003             # XXX rate-limit
1004             logging.warning('"mutate" reply "count" is %d instead of 1'
1005                             % count)
1006             return False
1007
1008         select = ops[self._inc_index + 1]
1009         rows = select.get("rows")
1010         if not Transaction.__check_json_type(rows, (list, tuple),
1011                                              '"select" reply "rows"'):
1012             return False
1013         if len(rows) != 1:
1014             # XXX rate-limit
1015             logging.warning('"select" reply "rows" has %d elements '
1016                             'instead of 1' % len(rows))
1017             return False
1018         row = rows[0]
1019         if not Transaction.__check_json_type(row, (dict,),
1020                                              '"select" reply row'):
1021             return False
1022         column = row.get(self._inc_column)
1023         if not Transaction.__check_json_type(column, (int, long),
1024                                              '"select" reply inc column'):
1025             return False
1026         self._inc_new_value = column
1027         return True
1028
1029     def __process_insert_reply(self, insert, ops):
1030         if insert.op_index >= len(ops):
1031             # XXX rate-limit
1032             logging.warning("reply does not contain enough operations "
1033                             "for insert (has %d, needs %d)"
1034                             % (len(ops), insert.op_index))
1035             return False
1036
1037         # We know that this is a JSON object because the loop in
1038         # __process_reply() already checked.
1039         reply = ops[insert.op_index]
1040         json_uuid = reply.get("uuid")
1041         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1042                                              '"insert" reply "uuid"'):
1043             return False
1044
1045         try:
1046             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1047         except error.Error:
1048             # XXX rate-limit
1049             logging.warning('"insert" reply "uuid" is not a JSON UUID')
1050             return False
1051
1052         insert.real = uuid_
1053         return True