python.ovs.db.idl: Fix Row.delete() of a row already committed to the db.
[sliver-openvswitch.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29
30 class Idl:
31     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
32
33     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
34     requests to an OVSDB database server and parses the responses, converting
35     raw JSON into data structures that are easier for clients to digest.
36
37     The IDL also assists with issuing database transactions.  The client
38     creates a transaction, manipulates the IDL data structures, and commits or
39     aborts the transaction.  The IDL then composes and issues the necessary
40     JSON-RPC requests and reports to the client whether the transaction
41     completed successfully.
42
43     The client is allowed to access the following attributes directly, in a
44     read-only fashion:
45
46     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
48       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
49       to a Row object.
50
51       The client may directly read and write the Row objects referenced by the
52       'rows' map values.  Refer to Row for more details.
53
54     - 'change_seqno': A number that represents the IDL's state.  When the IDL
55       is updated (by Idl.run()), its value changes.  The sequence number can
56       occasionally change even if the database does not.  This happens if the
57       connection to the database drops and reconnects, which causes the
58       database contents to be reloaded even if they didn't change.  (It could
59       also happen if the database server sends out a "change" that reflects
60       what the IDL already thought was in the database.  The database server is
61       not supposed to do that, but bugs could in theory cause it to do so.)
62
63     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
64       if no lock is configured.
65
66     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
67       lock, and False otherwise.
68
69       Locking and unlocking happens asynchronously from the database client's
70       point of view, so the information is only useful for optimization
71       (e.g. if the client doesn't have the lock then there's no point in trying
72       to write to the database).
73
74     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
75       the database server has indicated that some other client already owns the
76       requested lock, and False otherwise.
77
78     - 'txn': The ovs.db.idl.Transaction object for the database transaction
79       currently being constructed, if there is one, or None otherwise.
80 """
81
82     def __init__(self, remote, schema):
83         """Creates and returns a connection to the database named 'db_name' on
84         'remote', which should be in a form acceptable to
85         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
86         replica of the remote database.
87
88         'schema' should be the schema for the remote database.  The caller may
89         have cut it down by removing tables or columns that are not of
90         interest.  The IDL will only replicate the tables and columns that
91         remain.  The caller may also add a attribute named 'alert' to selected
92         remaining columns, setting its value to False; if so, then changes to
93         those columns will not be considered changes to the database for the
94         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
95         useful for columns that the IDL's client will write but not read.
96
97         As a convenience to users, 'schema' may also be an instance of the
98         SchemaHelper class.
99
100         The IDL uses and modifies 'schema' directly."""
101
102         assert isinstance(schema, SchemaHelper)
103         schema = schema.get_idl_schema()
104
105         self.tables = schema.tables
106         self._db = schema
107         self._session = ovs.jsonrpc.Session.open(remote)
108         self._monitor_request_id = None
109         self._last_seqno = None
110         self.change_seqno = 0
111
112         # Database locking.
113         self.lock_name = None          # Name of lock we need, None if none.
114         self.has_lock = False          # Has db server said we have the lock?
115         self.is_lock_contended = False  # Has db server said we can't get lock?
116         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
117
118         # Transaction support.
119         self.txn = None
120         self._outstanding_txns = {}
121
122         for table in schema.tables.itervalues():
123             for column in table.columns.itervalues():
124                 if not hasattr(column, 'alert'):
125                     column.alert = True
126             table.need_table = False
127             table.rows = {}
128             table.idl = self
129
130     def close(self):
131         """Closes the connection to the database.  The IDL will no longer
132         update."""
133         self._session.close()
134
135     def run(self):
136         """Processes a batch of messages from the database server.  Returns
137         True if the database as seen through the IDL changed, False if it did
138         not change.  The initial fetch of the entire contents of the remote
139         database is considered to be one kind of change.  If the IDL has been
140         configured to acquire a database lock (with Idl.set_lock()), then
141         successfully acquiring the lock is also considered to be a change.
142
143         This function can return occasional false positives, that is, report
144         that the database changed even though it didn't.  This happens if the
145         connection to the database drops and reconnects, which causes the
146         database contents to be reloaded even if they didn't change.  (It could
147         also happen if the database server sends out a "change" that reflects
148         what we already thought was in the database, but the database server is
149         not supposed to do that.)
150
151         As an alternative to checking the return value, the client may check
152         for changes in self.change_seqno."""
153         assert not self.txn
154         initial_change_seqno = self.change_seqno
155         self._session.run()
156         i = 0
157         while i < 50:
158             i += 1
159             if not self._session.is_connected():
160                 break
161
162             seqno = self._session.get_seqno()
163             if seqno != self._last_seqno:
164                 self._last_seqno = seqno
165                 self.__txn_abort_all()
166                 self.__send_monitor_request()
167                 if self.lock_name:
168                     self.__send_lock_request()
169                 break
170
171             msg = self._session.recv()
172             if msg is None:
173                 break
174             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
175                 and msg.method == "update"
176                 and len(msg.params) == 2
177                 and msg.params[0] == None):
178                 # Database contents changed.
179                 self.__parse_update(msg.params[1])
180             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
181                   and self._monitor_request_id is not None
182                   and self._monitor_request_id == msg.id):
183                 # Reply to our "monitor" request.
184                 try:
185                     self.change_seqno += 1
186                     self._monitor_request_id = None
187                     self.__clear()
188                     self.__parse_update(msg.result)
189                 except error.Error, e:
190                     vlog.err("%s: parse error in received schema: %s"
191                               % (self._session.get_name(), e))
192                     self.__error()
193             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
194                   and self._lock_request_id is not None
195                   and self._lock_request_id == msg.id):
196                 # Reply to our "lock" request.
197                 self.__parse_lock_reply(msg.result)
198             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
199                   and msg.method == "locked"):
200                 # We got our lock.
201                 self.__parse_lock_notify(msg.params, True)
202             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
203                   and msg.method == "stolen"):
204                 # Someone else stole our lock.
205                 self.__parse_lock_notify(msg.params, False)
206             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
207                 # Reply to our echo request.  Ignore it.
208                 pass
209             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
210                                ovs.jsonrpc.Message.T_REPLY)
211                   and self.__txn_process_reply(msg)):
212                 # __txn_process_reply() did everything needed.
213                 pass
214             else:
215                 # This can happen if a transaction is destroyed before we
216                 # receive the reply, so keep the log level low.
217                 vlog.dbg("%s: received unexpected %s message"
218                          % (self._session.get_name(),
219                              ovs.jsonrpc.Message.type_to_string(msg.type)))
220
221         return initial_change_seqno != self.change_seqno
222
223     def wait(self, poller):
224         """Arranges for poller.block() to wake up when self.run() has something
225         to do or when activity occurs on a transaction on 'self'."""
226         self._session.wait(poller)
227         self._session.recv_wait(poller)
228
229     def has_ever_connected(self):
230         """Returns True, if the IDL successfully connected to the remote
231         database and retrieved its contents (even if the connection
232         subsequently dropped and is in the process of reconnecting).  If so,
233         then the IDL contains an atomic snapshot of the database's contents
234         (but it might be arbitrarily old if the connection dropped).
235
236         Returns False if the IDL has never connected or retrieved the
237         database's contents.  If so, the IDL is empty."""
238         return self.change_seqno != 0
239
240     def force_reconnect(self):
241         """Forces the IDL to drop its connection to the database and reconnect.
242         In the meantime, the contents of the IDL will not change."""
243         self._session.force_reconnect()
244
245     def set_lock(self, lock_name):
246         """If 'lock_name' is not None, configures the IDL to obtain the named
247         lock from the database server and to avoid modifying the database when
248         the lock cannot be acquired (that is, when another client has the same
249         lock).
250
251         If 'lock_name' is None, drops the locking requirement and releases the
252         lock."""
253         assert not self.txn
254         assert not self._outstanding_txns
255
256         if self.lock_name and (not lock_name or lock_name != self.lock_name):
257             # Release previous lock.
258             self.__send_unlock_request()
259             self.lock_name = None
260             self.is_lock_contended = False
261
262         if lock_name and not self.lock_name:
263             # Acquire new lock.
264             self.lock_name = lock_name
265             self.__send_lock_request()
266
267     def __clear(self):
268         changed = False
269
270         for table in self.tables.itervalues():
271             if table.rows:
272                 changed = True
273                 table.rows = {}
274
275         if changed:
276             self.change_seqno += 1
277
278     def __update_has_lock(self, new_has_lock):
279         if new_has_lock and not self.has_lock:
280             if self._monitor_request_id is None:
281                 self.change_seqno += 1
282             else:
283                 # We're waiting for a monitor reply, so don't signal that the
284                 # database changed.  The monitor reply will increment
285                 # change_seqno anyhow.
286                 pass
287             self.is_lock_contended = False
288         self.has_lock = new_has_lock
289
290     def __do_send_lock_request(self, method):
291         self.__update_has_lock(False)
292         self._lock_request_id = None
293         if self._session.is_connected():
294             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
295             msg_id = msg.id
296             self._session.send(msg)
297         else:
298             msg_id = None
299         return msg_id
300
301     def __send_lock_request(self):
302         self._lock_request_id = self.__do_send_lock_request("lock")
303
304     def __send_unlock_request(self):
305         self.__do_send_lock_request("unlock")
306
307     def __parse_lock_reply(self, result):
308         self._lock_request_id = None
309         got_lock = type(result) == dict and result.get("locked") is True
310         self.__update_has_lock(got_lock)
311         if not got_lock:
312             self.is_lock_contended = True
313
314     def __parse_lock_notify(self, params, new_has_lock):
315         if (self.lock_name is not None
316             and type(params) in (list, tuple)
317             and params
318             and params[0] == self.lock_name):
319             self.__update_has_lock(self, new_has_lock)
320             if not new_has_lock:
321                 self.is_lock_contended = True
322
323     def __send_monitor_request(self):
324         monitor_requests = {}
325         for table in self.tables.itervalues():
326             monitor_requests[table.name] = {"columns": table.columns.keys()}
327         msg = ovs.jsonrpc.Message.create_request(
328             "monitor", [self._db.name, None, monitor_requests])
329         self._monitor_request_id = msg.id
330         self._session.send(msg)
331
332     def __parse_update(self, update):
333         try:
334             self.__do_parse_update(update)
335         except error.Error, e:
336             vlog.err("%s: error parsing update: %s"
337                      % (self._session.get_name(), e))
338
339     def __do_parse_update(self, table_updates):
340         if type(table_updates) != dict:
341             raise error.Error("<table-updates> is not an object",
342                               table_updates)
343
344         for table_name, table_update in table_updates.iteritems():
345             table = self.tables.get(table_name)
346             if not table:
347                 raise error.Error('<table-updates> includes unknown '
348                                   'table "%s"' % table_name)
349
350             if type(table_update) != dict:
351                 raise error.Error('<table-update> for table "%s" is not '
352                                   'an object' % table_name, table_update)
353
354             for uuid_string, row_update in table_update.iteritems():
355                 if not ovs.ovsuuid.is_valid_string(uuid_string):
356                     raise error.Error('<table-update> for table "%s" '
357                                       'contains bad UUID "%s" as member '
358                                       'name' % (table_name, uuid_string),
359                                       table_update)
360                 uuid = ovs.ovsuuid.from_string(uuid_string)
361
362                 if type(row_update) != dict:
363                     raise error.Error('<table-update> for table "%s" '
364                                       'contains <row-update> for %s that '
365                                       'is not an object'
366                                       % (table_name, uuid_string))
367
368                 parser = ovs.db.parser.Parser(row_update, "row-update")
369                 old = parser.get_optional("old", [dict])
370                 new = parser.get_optional("new", [dict])
371                 parser.finish()
372
373                 if not old and not new:
374                     raise error.Error('<row-update> missing "old" and '
375                                       '"new" members', row_update)
376
377                 if self.__process_update(table, uuid, old, new):
378                     self.change_seqno += 1
379
380     def __process_update(self, table, uuid, old, new):
381         """Returns True if a column changed, False otherwise."""
382         row = table.rows.get(uuid)
383         changed = False
384         if not new:
385             # Delete row.
386             if row:
387                 del table.rows[uuid]
388                 changed = True
389             else:
390                 # XXX rate-limit
391                 vlog.warn("cannot delete missing row %s from table %s"
392                           % (uuid, table.name))
393         elif not old:
394             # Insert row.
395             if not row:
396                 row = self.__create_row(table, uuid)
397                 changed = True
398             else:
399                 # XXX rate-limit
400                 vlog.warn("cannot add existing row %s to table %s"
401                           % (uuid, table.name))
402             if self.__row_update(table, row, new):
403                 changed = True
404         else:
405             if not row:
406                 row = self.__create_row(table, uuid)
407                 changed = True
408                 # XXX rate-limit
409                 vlog.warn("cannot modify missing row %s in table %s"
410                           % (uuid, table.name))
411             if self.__row_update(table, row, new):
412                 changed = True
413         return changed
414
415     def __row_update(self, table, row, row_json):
416         changed = False
417         for column_name, datum_json in row_json.iteritems():
418             column = table.columns.get(column_name)
419             if not column:
420                 # XXX rate-limit
421                 vlog.warn("unknown column %s updating table %s"
422                           % (column_name, table.name))
423                 continue
424
425             try:
426                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
427             except error.Error, e:
428                 # XXX rate-limit
429                 vlog.warn("error parsing column %s in table %s: %s"
430                           % (column_name, table.name, e))
431                 continue
432
433             if datum != row._data[column_name]:
434                 row._data[column_name] = datum
435                 if column.alert:
436                     changed = True
437             else:
438                 # Didn't really change but the OVSDB monitor protocol always
439                 # includes every value in a row.
440                 pass
441         return changed
442
443     def __create_row(self, table, uuid):
444         data = {}
445         for column in table.columns.itervalues():
446             data[column.name] = ovs.db.data.Datum.default(column.type)
447         row = table.rows[uuid] = Row(self, table, uuid, data)
448         return row
449
450     def __error(self):
451         self._session.force_reconnect()
452
453     def __txn_abort_all(self):
454         while self._outstanding_txns:
455             txn = self._outstanding_txns.popitem()[1]
456             txn._status = Transaction.TRY_AGAIN
457
458     def __txn_process_reply(self, msg):
459         txn = self._outstanding_txns.pop(msg.id, None)
460         if txn:
461             txn._process_reply(msg)
462
463
464 def _uuid_to_row(atom, base):
465     if base.ref_table:
466         return base.ref_table.rows.get(atom)
467     else:
468         return atom
469
470
471 def _row_to_uuid(value):
472     if type(value) == Row:
473         return value.uuid
474     else:
475         return value
476
477
478 class Row(object):
479     """A row within an IDL.
480
481     The client may access the following attributes directly:
482
483     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
484
485     - An attribute for each column in the Row's table, named for the column,
486       whose values are as returned by Datum.to_python() for the column's type.
487
488       If some error occurs (e.g. the database server's idea of the column is
489       different from the IDL's idea), then the attribute values is the
490       "default" value return by Datum.default() for the column's type.  (It is
491       important to know this because the default value may violate constraints
492       for the column's type, e.g. the default integer value is 0 even if column
493       contraints require the column's value to be positive.)
494
495       When a transaction is active, column attributes may also be assigned new
496       values.  Committing the transaction will then cause the new value to be
497       stored into the database.
498
499       *NOTE*: In the current implementation, the value of a column is a *copy*
500       of the value in the database.  This means that modifying its value
501       directly will have no useful effect.  For example, the following:
502         row.mycolumn["a"] = "b"              # don't do this
503       will not change anything in the database, even after commit.  To modify
504       the column, instead assign the modified column value back to the column:
505         d = row.mycolumn
506         d["a"] = "b"
507         row.mycolumn = d
508 """
509     def __init__(self, idl, table, uuid, data):
510         # All of the explicit references to self.__dict__ below are required
511         # to set real attributes with invoking self.__getattr__().
512         self.__dict__["uuid"] = uuid
513
514         self.__dict__["_idl"] = idl
515         self.__dict__["_table"] = table
516
517         # _data is the committed data.  It takes the following values:
518         #
519         #   - A dictionary that maps every column name to a Datum, if the row
520         #     exists in the committed form of the database.
521         #
522         #   - None, if this row is newly inserted within the active transaction
523         #     and thus has no committed form.
524         self.__dict__["_data"] = data
525
526         # _changes describes changes to this row within the active transaction.
527         # It takes the following values:
528         #
529         #   - {}, the empty dictionary, if no transaction is active or if the
530         #     row has yet not been changed within this transaction.
531         #
532         #   - A dictionary that maps a column name to its new Datum, if an
533         #     active transaction changes those columns' values.
534         #
535         #   - A dictionary that maps every column name to a Datum, if the row
536         #     is newly inserted within the active transaction.
537         #
538         #   - None, if this transaction deletes this row.
539         self.__dict__["_changes"] = {}
540
541         # A dictionary whose keys are the names of columns that must be
542         # verified as prerequisites when the transaction commits.  The values
543         # in the dictionary are all None.
544         self.__dict__["_prereqs"] = {}
545
546     def __getattr__(self, column_name):
547         assert self._changes is not None
548
549         datum = self._changes.get(column_name)
550         if datum is None:
551             if self._data is None:
552                 raise AttributeError("%s instance has no attribute '%s'" %
553                                      (self.__class__.__name__, column_name))
554             datum = self._data[column_name]
555
556         return datum.to_python(_uuid_to_row)
557
558     def __setattr__(self, column_name, value):
559         assert self._changes is not None
560         assert self._idl.txn
561
562         column = self._table.columns[column_name]
563         try:
564             datum = ovs.db.data.Datum.from_python(column.type, value,
565                                                   _row_to_uuid)
566         except error.Error, e:
567             # XXX rate-limit
568             vlog.err("attempting to write bad value to column %s (%s)"
569                      % (column_name, e))
570             return
571         self._idl.txn._write(self, column, datum)
572
573     def verify(self, column_name):
574         """Causes the original contents of column 'column_name' in this row to
575         be verified as a prerequisite to completing the transaction.  That is,
576         if 'column_name' changed in this row (or if this row was deleted)
577         between the time that the IDL originally read its contents and the time
578         that the transaction commits, then the transaction aborts and
579         Transaction.commit() returns Transaction.TRY_AGAIN.
580
581         The intention is that, to ensure that no transaction commits based on
582         dirty reads, an application should call Row.verify() on each data item
583         read as part of a read-modify-write operation.
584
585         In some cases Row.verify() reduces to a no-op, because the current
586         value of the column is already known:
587
588           - If this row is a row created by the current transaction (returned
589             by Transaction.insert()).
590
591           - If the column has already been modified within the current
592             transaction.
593
594         Because of the latter property, always call Row.verify() *before*
595         modifying the column, for a given read-modify-write.
596
597         A transaction must be in progress."""
598         assert self._idl.txn
599         assert self._changes is not None
600         if not self._data or column_name in self._changes:
601             return
602
603         self._prereqs[column_name] = None
604
605     def delete(self):
606         """Deletes this row from its table.
607
608         A transaction must be in progress."""
609         assert self._idl.txn
610         assert self._changes is not None
611         if self._data is None:
612             del self._idl.txn._txn_rows[self.uuid]
613         else:
614             self._idl.txn._txn_rows[self.uuid] = self
615         self.__dict__["_changes"] = None
616         del self._table.rows[self.uuid]
617
618     def increment(self, column_name):
619         """Causes the transaction, when committed, to increment the value of
620         'column_name' within this row by 1.  'column_name' must have an integer
621         type.  After the transaction commits successfully, the client may
622         retrieve the final (incremented) value of 'column_name' with
623         Transaction.get_increment_new_value().
624
625         The client could accomplish something similar by reading and writing
626         and verify()ing columns.  However, increment() will never (by itself)
627         cause a transaction to fail because of a verify error.
628
629         The intended use is for incrementing the "next_cfg" column in
630         the Open_vSwitch table."""
631         self._idl.txn._increment(self, column_name)
632
633
634 def _uuid_name_from_uuid(uuid):
635     return "row%s" % str(uuid).replace("-", "_")
636
637
638 def _where_uuid_equals(uuid):
639     return [["_uuid", "==", ["uuid", str(uuid)]]]
640
641
642 class _InsertedRow(object):
643     def __init__(self, op_index):
644         self.op_index = op_index
645         self.real = None
646
647
648 class Transaction(object):
649     """A transaction may modify the contents of a database by modifying the
650     values of columns, deleting rows, inserting rows, or adding checks that
651     columns in the database have not changed ("verify" operations), through
652     Row methods.
653
654     Reading and writing columns and inserting and deleting rows are all
655     straightforward.  The reasons to verify columns are less obvious.
656     Verification is the key to maintaining transactional integrity.  Because
657     OVSDB handles multiple clients, it can happen that between the time that
658     OVSDB client A reads a column and writes a new value, OVSDB client B has
659     written that column.  Client A's write should not ordinarily overwrite
660     client B's, especially if the column in question is a "map" column that
661     contains several more or less independent data items.  If client A adds a
662     "verify" operation before it writes the column, then the transaction fails
663     in case client B modifies it first.  Client A will then see the new value
664     of the column and compose a new transaction based on the new contents
665     written by client B.
666
667     When a transaction is complete, which must be before the next call to
668     Idl.run(), call Transaction.commit() or Transaction.abort().
669
670     The life-cycle of a transaction looks like this:
671
672     1. Create the transaction and record the initial sequence number:
673
674         seqno = idl.change_seqno(idl)
675         txn = Transaction(idl)
676
677     2. Modify the database with Row and Transaction methods.
678
679     3. Commit the transaction by calling Transaction.commit().  The first call
680        to this function probably returns Transaction.INCOMPLETE.  The client
681        must keep calling again along as this remains true, calling Idl.run() in
682        between to let the IDL do protocol processing.  (If the client doesn't
683        have anything else to do in the meantime, it can use
684        Transaction.commit_block() to avoid having to loop itself.)
685
686     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
687        to change from the saved 'seqno' (it's possible that it's already
688        changed, in which case the client should not wait at all), then start
689        over from step 1.  Only a call to Idl.run() will change the return value
690        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
691
692     # Status values that Transaction.commit() can return.
693     UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
694     UNCHANGED = "unchanged"      # Transaction didn't include any changes.
695     INCOMPLETE = "incomplete"    # Commit in progress, please wait.
696     ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
697     SUCCESS = "success"          # Commit successful.
698     TRY_AGAIN = "try again"      # Commit failed because a "verify" operation
699                                  # reported an inconsistency, due to a network
700                                  # problem, or other transient failure.  Wait
701                                  # for a change, then try again.
702     NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
703     ERROR = "error"              # Commit failed due to a hard error.
704
705     @staticmethod
706     def status_to_string(status):
707         """Converts one of the status values that Transaction.commit() can
708         return into a human-readable string.
709
710         (The status values are in fact such strings already, so
711         there's nothing to do.)"""
712         return status
713
714     def __init__(self, idl):
715         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
716         A given Idl may only have a single active transaction at a time.
717
718         A Transaction may modify the contents of a database by assigning new
719         values to columns (attributes of Row), deleting rows (with
720         Row.delete()), or inserting rows (with Transaction.insert()).  It may
721         also check that columns in the database have not changed with
722         Row.verify().
723
724         When a transaction is complete (which must be before the next call to
725         Idl.run()), call Transaction.commit() or Transaction.abort()."""
726         assert idl.txn is None
727
728         idl.txn = self
729         self._request_id = None
730         self.idl = idl
731         self.dry_run = False
732         self._txn_rows = {}
733         self._status = Transaction.UNCOMMITTED
734         self._error = None
735         self._comments = []
736         self._commit_seqno = self.idl.change_seqno
737
738         self._inc_row = None
739         self._inc_column = None
740
741         self._inserted_rows = {}  # Map from UUID to _InsertedRow
742
743     def add_comment(self, comment):
744         """Appens 'comment' to the comments that will be passed to the OVSDB
745         server when this transaction is committed.  (The comment will be
746         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
747         relatively human-readable form.)"""
748         self._comments.append(comment)
749
750     def wait(self, poller):
751         """Causes poll_block() to wake up if this transaction has completed
752         committing."""
753         if self._status not in (Transaction.UNCOMMITTED,
754                                 Transaction.INCOMPLETE):
755             poller.immediate_wake()
756
757     def _substitute_uuids(self, json):
758         if type(json) in (list, tuple):
759             if (len(json) == 2
760                 and json[0] == 'uuid'
761                 and ovs.ovsuuid.is_valid_string(json[1])):
762                 uuid = ovs.ovsuuid.from_string(json[1])
763                 row = self._txn_rows.get(uuid, None)
764                 if row and row._data is None:
765                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
766             else:
767                 return [self._substitute_uuids(elem) for elem in json]
768         return json
769
770     def __disassemble(self):
771         self.idl.txn = None
772
773         for row in self._txn_rows.itervalues():
774             if row._changes is None:
775                 row._table.rows[row.uuid] = row
776             elif row._data is None:
777                 del row._table.rows[row.uuid]
778             row.__dict__["_changes"] = {}
779             row.__dict__["_prereqs"] = {}
780         self._txn_rows = {}
781
782     def commit(self):
783         """Attempts to commit 'txn'.  Returns the status of the commit
784         operation, one of the following constants:
785
786           Transaction.INCOMPLETE:
787
788               The transaction is in progress, but not yet complete.  The caller
789               should call again later, after calling Idl.run() to let the
790               IDL do OVSDB protocol processing.
791
792           Transaction.UNCHANGED:
793
794               The transaction is complete.  (It didn't actually change the
795               database, so the IDL didn't send any request to the database
796               server.)
797
798           Transaction.ABORTED:
799
800               The caller previously called Transaction.abort().
801
802           Transaction.SUCCESS:
803
804               The transaction was successful.  The update made by the
805               transaction (and possibly other changes made by other database
806               clients) should already be visible in the IDL.
807
808           Transaction.TRY_AGAIN:
809
810               The transaction failed for some transient reason, e.g. because a
811               "verify" operation reported an inconsistency or due to a network
812               problem.  The caller should wait for a change to the database,
813               then compose a new transaction, and commit the new transaction.
814
815               Use Idl.change_seqno to wait for a change in the database.  It is
816               important to use its value *before* the initial call to
817               Transaction.commit() as the baseline for this purpose, because
818               the change that one should wait for can happen after the initial
819               call but before the call that returns Transaction.TRY_AGAIN, and
820               using some other baseline value in that situation could cause an
821               indefinite wait if the database rarely changes.
822
823           Transaction.NOT_LOCKED:
824
825               The transaction failed because the IDL has been configured to
826               require a database lock (with Idl.set_lock()) but didn't
827               get it yet or has already lost it.
828
829         Committing a transaction rolls back all of the changes that it made to
830         the IDL's copy of the database.  If the transaction commits
831         successfully, then the database server will send an update and, thus,
832         the IDL will be updated with the committed changes."""
833         # The status can only change if we're the active transaction.
834         # (Otherwise, our status will change only in Idl.run().)
835         if self != self.idl.txn:
836             return self._status
837
838         # If we need a lock but don't have it, give up quickly.
839         if self.idl.lock_name and not self.idl.has_lock():
840             self._status = Transaction.NOT_LOCKED
841             self.__disassemble()
842             return self._status
843
844         operations = [self.idl._db.name]
845
846         # Assert that we have the required lock (avoiding a race).
847         if self.idl.lock_name:
848             operations.append({"op": "assert",
849                                "lock": self.idl.lock_name})
850
851         # Add prerequisites and declarations of new rows.
852         for row in self._txn_rows.itervalues():
853             if row._prereqs:
854                 rows = {}
855                 columns = []
856                 for column_name in row._prereqs:
857                     columns.append(column_name)
858                     rows[column_name] = row._data[column_name].to_json()
859                 operations.append({"op": "wait",
860                                    "table": row._table.name,
861                                    "timeout": 0,
862                                    "where": _where_uuid_equals(row.uuid),
863                                    "until": "==",
864                                    "columns": columns,
865                                    "rows": [rows]})
866
867         # Add updates.
868         any_updates = False
869         for row in self._txn_rows.itervalues():
870             if row._changes is None:
871                 if row._table.is_root:
872                     operations.append({"op": "delete",
873                                        "table": row._table.name,
874                                        "where": _where_uuid_equals(row.uuid)})
875                     any_updates = True
876                 else:
877                     # Let ovsdb-server decide whether to really delete it.
878                     pass
879             elif row._changes:
880                 op = {"table": row._table.name}
881                 if row._data is None:
882                     op["op"] = "insert"
883                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
884                     any_updates = True
885
886                     op_index = len(operations) - 1
887                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
888                 else:
889                     op["op"] = "update"
890                     op["where"] = _where_uuid_equals(row.uuid)
891
892                 row_json = {}
893                 op["row"] = row_json
894
895                 for column_name, datum in row._changes.iteritems():
896                     if row._data is not None or not datum.is_default():
897                         row_json[column_name] = (
898                                 self._substitute_uuids(datum.to_json()))
899
900                         # If anything really changed, consider it an update.
901                         # We can't suppress not-really-changed values earlier
902                         # or transactions would become nonatomic (see the big
903                         # comment inside Transaction._write()).
904                         if (not any_updates and row._data is not None and
905                             row._data[column_name] != datum):
906                             any_updates = True
907
908                 if row._data is None or row_json:
909                     operations.append(op)
910
911         # Add increment.
912         if self._inc_row and any_updates:
913             self._inc_index = len(operations) - 1
914
915             operations.append({"op": "mutate",
916                                "table": self._inc_row._table.name,
917                                "where": self._substitute_uuids(
918                                    _where_uuid_equals(self._inc_row.uuid)),
919                                "mutations": [[self._inc_column, "+=", 1]]})
920             operations.append({"op": "select",
921                                "table": self._inc_row._table.name,
922                                "where": self._substitute_uuids(
923                                    _where_uuid_equals(self._inc_row.uuid)),
924                                "columns": [self._inc_column]})
925
926         # Add comment.
927         if self._comments:
928             operations.append({"op": "comment",
929                                "comment": "\n".join(self._comments)})
930
931         # Dry run?
932         if self.dry_run:
933             operations.append({"op": "abort"})
934
935         if not any_updates:
936             self._status = Transaction.UNCHANGED
937         else:
938             msg = ovs.jsonrpc.Message.create_request("transact", operations)
939             self._request_id = msg.id
940             if not self.idl._session.send(msg):
941                 self.idl._outstanding_txns[self._request_id] = self
942                 self._status = Transaction.INCOMPLETE
943             else:
944                 self._status = Transaction.TRY_AGAIN
945
946         self.__disassemble()
947         return self._status
948
949     def commit_block(self):
950         """Attempts to commit this transaction, blocking until the commit
951         either succeeds or fails.  Returns the final commit status, which may
952         be any Transaction.* value other than Transaction.INCOMPLETE.
953
954         This function calls Idl.run() on this transaction'ss IDL, so it may
955         cause Idl.change_seqno to change."""
956         while True:
957             status = self.commit()
958             if status != Transaction.INCOMPLETE:
959                 return status
960
961             self.idl.run()
962
963             poller = ovs.poller.Poller()
964             self.idl.wait(poller)
965             self.wait(poller)
966             poller.block()
967
968     def get_increment_new_value(self):
969         """Returns the final (incremented) value of the column in this
970         transaction that was set to be incremented by Row.increment.  This
971         transaction must have committed successfully."""
972         assert self._status == Transaction.SUCCESS
973         return self._inc_new_value
974
975     def abort(self):
976         """Aborts this transaction.  If Transaction.commit() has already been
977         called then the transaction might get committed anyhow."""
978         self.__disassemble()
979         if self._status in (Transaction.UNCOMMITTED,
980                             Transaction.INCOMPLETE):
981             self._status = Transaction.ABORTED
982
983     def get_error(self):
984         """Returns a string representing this transaction's current status,
985         suitable for use in log messages."""
986         if self._status != Transaction.ERROR:
987             return Transaction.status_to_string(self._status)
988         elif self._error:
989             return self._error
990         else:
991             return "no error details available"
992
993     def __set_error_json(self, json):
994         if self._error is None:
995             self._error = ovs.json.to_string(json)
996
997     def get_insert_uuid(self, uuid):
998         """Finds and returns the permanent UUID that the database assigned to a
999         newly inserted row, given the UUID that Transaction.insert() assigned
1000         locally to that row.
1001
1002         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1003         or if it was assigned by that function and then deleted by Row.delete()
1004         within the same transaction.  (Rows that are inserted and then deleted
1005         within a single transaction are never sent to the database server, so
1006         it never assigns them a permanent UUID.)
1007
1008         This transaction must have completed successfully."""
1009         assert self._status in (Transaction.SUCCESS,
1010                                 Transaction.UNCHANGED)
1011         inserted_row = self._inserted_rows.get(uuid)
1012         if inserted_row:
1013             return inserted_row.real
1014         return None
1015
1016     def _increment(self, row, column):
1017         assert not self._inc_row
1018         self._inc_row = row
1019         self._inc_column = column
1020
1021     def _write(self, row, column, datum):
1022         assert row._changes is not None
1023
1024         txn = row._idl.txn
1025
1026         # If this is a write-only column and the datum being written is the
1027         # same as the one already there, just skip the update entirely.  This
1028         # is worth optimizing because we have a lot of columns that get
1029         # periodically refreshed into the database but don't actually change
1030         # that often.
1031         #
1032         # We don't do this for read/write columns because that would break
1033         # atomicity of transactions--some other client might have written a
1034         # different value in that column since we read it.  (But if a whole
1035         # transaction only does writes of existing values, without making any
1036         # real changes, we will drop the whole transaction later in
1037         # ovsdb_idl_txn_commit().)
1038         if not column.alert and row._data.get(column.name) == datum:
1039             new_value = row._changes.get(column.name)
1040             if new_value is None or new_value == datum:
1041                 return
1042
1043         txn._txn_rows[row.uuid] = row
1044         row._changes[column.name] = datum.copy()
1045
1046     def insert(self, table, new_uuid=None):
1047         """Inserts and returns a new row in 'table', which must be one of the
1048         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1049
1050         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1051         is randomly generated; otherwise 'uuid' should specify a randomly
1052         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1053         different UUID when 'txn' is committed, but the IDL will replace any
1054         uses of the provisional UUID in the data to be to be committed by the
1055         UUID assigned by ovsdb-server."""
1056         assert self._status == Transaction.UNCOMMITTED
1057         if new_uuid is None:
1058             new_uuid = uuid.uuid4()
1059         row = Row(self.idl, table, new_uuid, None)
1060         table.rows[row.uuid] = row
1061         self._txn_rows[row.uuid] = row
1062         return row
1063
1064     def _process_reply(self, msg):
1065         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1066             self._status = Transaction.ERROR
1067         elif type(msg.result) not in (list, tuple):
1068             # XXX rate-limit
1069             vlog.warn('reply to "transact" is not JSON array')
1070         else:
1071             hard_errors = False
1072             soft_errors = False
1073             lock_errors = False
1074
1075             ops = msg.result
1076             for op in ops:
1077                 if op is None:
1078                     # This isn't an error in itself but indicates that some
1079                     # prior operation failed, so make sure that we know about
1080                     # it.
1081                     soft_errors = True
1082                 elif type(op) == dict:
1083                     error = op.get("error")
1084                     if error is not None:
1085                         if error == "timed out":
1086                             soft_errors = True
1087                         elif error == "not owner":
1088                             lock_errors = True
1089                         elif error == "aborted":
1090                             pass
1091                         else:
1092                             hard_errors = True
1093                             self.__set_error_json(op)
1094                 else:
1095                     hard_errors = True
1096                     self.__set_error_json(op)
1097                     # XXX rate-limit
1098                     vlog.warn("operation reply is not JSON null or object")
1099
1100             if not soft_errors and not hard_errors and not lock_errors:
1101                 if self._inc_row and not self.__process_inc_reply(ops):
1102                     hard_errors = True
1103
1104                 for insert in self._inserted_rows.itervalues():
1105                     if not self.__process_insert_reply(insert, ops):
1106                         hard_errors = True
1107
1108             if hard_errors:
1109                 self._status = Transaction.ERROR
1110             elif lock_errors:
1111                 self._status = Transaction.NOT_LOCKED
1112             elif soft_errors:
1113                 self._status = Transaction.TRY_AGAIN
1114             else:
1115                 self._status = Transaction.SUCCESS
1116
1117     @staticmethod
1118     def __check_json_type(json, types, name):
1119         if not json:
1120             # XXX rate-limit
1121             vlog.warn("%s is missing" % name)
1122             return False
1123         elif type(json) not in types:
1124             # XXX rate-limit
1125             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1126             return False
1127         else:
1128             return True
1129
1130     def __process_inc_reply(self, ops):
1131         if self._inc_index + 2 > len(ops):
1132             # XXX rate-limit
1133             vlog.warn("reply does not contain enough operations for "
1134                       "increment (has %d, needs %d)" %
1135                       (len(ops), self._inc_index + 2))
1136
1137         # We know that this is a JSON object because the loop in
1138         # __process_reply() already checked.
1139         mutate = ops[self._inc_index]
1140         count = mutate.get("count")
1141         if not Transaction.__check_json_type(count, (int, long),
1142                                              '"mutate" reply "count"'):
1143             return False
1144         if count != 1:
1145             # XXX rate-limit
1146             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1147             return False
1148
1149         select = ops[self._inc_index + 1]
1150         rows = select.get("rows")
1151         if not Transaction.__check_json_type(rows, (list, tuple),
1152                                              '"select" reply "rows"'):
1153             return False
1154         if len(rows) != 1:
1155             # XXX rate-limit
1156             vlog.warn('"select" reply "rows" has %d elements '
1157                       'instead of 1' % len(rows))
1158             return False
1159         row = rows[0]
1160         if not Transaction.__check_json_type(row, (dict,),
1161                                              '"select" reply row'):
1162             return False
1163         column = row.get(self._inc_column)
1164         if not Transaction.__check_json_type(column, (int, long),
1165                                              '"select" reply inc column'):
1166             return False
1167         self._inc_new_value = column
1168         return True
1169
1170     def __process_insert_reply(self, insert, ops):
1171         if insert.op_index >= len(ops):
1172             # XXX rate-limit
1173             vlog.warn("reply does not contain enough operations "
1174                       "for insert (has %d, needs %d)"
1175                       % (len(ops), insert.op_index))
1176             return False
1177
1178         # We know that this is a JSON object because the loop in
1179         # __process_reply() already checked.
1180         reply = ops[insert.op_index]
1181         json_uuid = reply.get("uuid")
1182         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1183                                              '"insert" reply "uuid"'):
1184             return False
1185
1186         try:
1187             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1188         except error.Error:
1189             # XXX rate-limit
1190             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1191             return False
1192
1193         insert.real = uuid_
1194         return True
1195
1196
1197 class SchemaHelper(object):
1198     """IDL Schema helper.
1199
1200     This class encapsulates the logic required to generate schemas suitable
1201     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1202     they are interested in using register_columns().  When finished, the
1203     get_idl_schema() function may be called.
1204
1205     The location on disk of the schema used may be found in the
1206     'schema_location' variable."""
1207
1208     def __init__(self, location=None, schema_json=None):
1209         """Creates a new Schema object.
1210
1211         'location' file path to ovs schema. None means default location
1212         'schema_json' schema in json preresentation in memory
1213         """
1214
1215         if location and schema_json:
1216             raise ValueError("both location and schema_json can't be "
1217                              "specified. it's ambiguous.")
1218         if schema_json is None:
1219             if location is None:
1220                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1221             schema_json = ovs.json.from_file(location)
1222
1223         self.schema_json = schema_json
1224         self._tables = {}
1225         self._all = False
1226
1227     def register_columns(self, table, columns):
1228         """Registers interest in the given 'columns' of 'table'.  Future calls
1229         to get_idl_schema() will include 'table':column for each column in
1230         'columns'. This function automatically avoids adding duplicate entries
1231         to the schema.
1232
1233         'table' must be a string.
1234         'columns' must be a list of strings.
1235         """
1236
1237         assert type(table) is str
1238         assert type(columns) is list
1239
1240         columns = set(columns) | self._tables.get(table, set())
1241         self._tables[table] = columns
1242
1243     def register_table(self, table):
1244         """Registers interest in the given all columns of 'table'. Future calls
1245         to get_idl_schema() will include all columns of 'table'.
1246
1247         'table' must be a string
1248         """
1249         assert type(table) is str
1250         self._tables[table] = set()  # empty set means all columns in the table
1251
1252     def register_all(self):
1253         """Registers interest in every column of every table."""
1254         self._all = True
1255
1256     def get_idl_schema(self):
1257         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1258         object based on columns registered using the register_columns()
1259         function."""
1260
1261         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1262         self.schema_json = None
1263
1264         if not self._all:
1265             schema_tables = {}
1266             for table, columns in self._tables.iteritems():
1267                 schema_tables[table] = (
1268                     self._keep_table_columns(schema, table, columns))
1269
1270             schema.tables = schema_tables
1271         return schema
1272
1273     def _keep_table_columns(self, schema, table_name, columns):
1274         assert table_name in schema.tables
1275         table = schema.tables[table_name]
1276
1277         if not columns:
1278             # empty set means all columns in the table
1279             return table
1280
1281         new_columns = {}
1282         for column_name in columns:
1283             assert type(column_name) is str
1284             assert column_name in table.columns
1285
1286             new_columns[column_name] = table.columns[column_name]
1287
1288         table.columns = new_columns
1289         return table