ovsdb-idl: Improve ovsdb_idl_txn_increment() interface.
[sliver-openvswitch.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29
30 class Idl:
31     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
32
33     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
34     requests to an OVSDB database server and parses the responses, converting
35     raw JSON into data structures that are easier for clients to digest.
36
37     The IDL also assists with issuing database transactions.  The client
38     creates a transaction, manipulates the IDL data structures, and commits or
39     aborts the transaction.  The IDL then composes and issues the necessary
40     JSON-RPC requests and reports to the client whether the transaction
41     completed successfully.
42
43     The client is allowed to access the following attributes directly, in a
44     read-only fashion:
45
46     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
48       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
49       to a Row object.
50
51       The client may directly read and write the Row objects referenced by the
52       'rows' map values.  Refer to Row for more details.
53
54     - 'change_seqno': A number that represents the IDL's state.  When the IDL
55       is updated (by Idl.run()), its value changes.
56
57     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58       if no lock is configured.
59
60     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61       lock, and False otherwise.
62
63       Locking and unlocking happens asynchronously from the database client's
64       point of view, so the information is only useful for optimization
65       (e.g. if the client doesn't have the lock then there's no point in trying
66       to write to the database).
67
68     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69       the database server has indicated that some other client already owns the
70       requested lock, and False otherwise.
71
72     - 'txn': The ovs.db.idl.Transaction object for the database transaction
73       currently being constructed, if there is one, or None otherwise.
74 """
75
76     def __init__(self, remote, schema):
77         """Creates and returns a connection to the database named 'db_name' on
78         'remote', which should be in a form acceptable to
79         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
80         replica of the remote database.
81
82         'schema' should be the schema for the remote database.  The caller may
83         have cut it down by removing tables or columns that are not of
84         interest.  The IDL will only replicate the tables and columns that
85         remain.  The caller may also add a attribute named 'alert' to selected
86         remaining columns, setting its value to False; if so, then changes to
87         those columns will not be considered changes to the database for the
88         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
89         useful for columns that the IDL's client will write but not read.
90
91         As a convenience to users, 'schema' may also be an instance of the
92         SchemaHelper class.
93
94         The IDL uses and modifies 'schema' directly."""
95
96         assert isinstance(schema, SchemaHelper)
97         schema = schema.get_idl_schema()
98
99         self.tables = schema.tables
100         self._db = schema
101         self._session = ovs.jsonrpc.Session.open(remote)
102         self._monitor_request_id = None
103         self._last_seqno = None
104         self.change_seqno = 0
105
106         # Database locking.
107         self.lock_name = None          # Name of lock we need, None if none.
108         self.has_lock = False          # Has db server said we have the lock?
109         self.is_lock_contended = False  # Has db server said we can't get lock?
110         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
111
112         # Transaction support.
113         self.txn = None
114         self._outstanding_txns = {}
115
116         for table in schema.tables.itervalues():
117             for column in table.columns.itervalues():
118                 if not hasattr(column, 'alert'):
119                     column.alert = True
120             table.need_table = False
121             table.rows = {}
122             table.idl = self
123
124     def close(self):
125         """Closes the connection to the database.  The IDL will no longer
126         update."""
127         self._session.close()
128
129     def run(self):
130         """Processes a batch of messages from the database server.  Returns
131         True if the database as seen through the IDL changed, False if it did
132         not change.  The initial fetch of the entire contents of the remote
133         database is considered to be one kind of change.  If the IDL has been
134         configured to acquire a database lock (with Idl.set_lock()), then
135         successfully acquiring the lock is also considered to be a change.
136
137         This function can return occasional false positives, that is, report
138         that the database changed even though it didn't.  This happens if the
139         connection to the database drops and reconnects, which causes the
140         database contents to be reloaded even if they didn't change.  (It could
141         also happen if the database server sends out a "change" that reflects
142         what we already thought was in the database, but the database server is
143         not supposed to do that.)
144
145         As an alternative to checking the return value, the client may check
146         for changes in self.change_seqno."""
147         assert not self.txn
148         initial_change_seqno = self.change_seqno
149         self._session.run()
150         i = 0
151         while i < 50:
152             i += 1
153             if not self._session.is_connected():
154                 break
155
156             seqno = self._session.get_seqno()
157             if seqno != self._last_seqno:
158                 self._last_seqno = seqno
159                 self.__txn_abort_all()
160                 self.__send_monitor_request()
161                 if self.lock_name:
162                     self.__send_lock_request()
163                 break
164
165             msg = self._session.recv()
166             if msg is None:
167                 break
168             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
169                 and msg.method == "update"
170                 and len(msg.params) == 2
171                 and msg.params[0] == None):
172                 # Database contents changed.
173                 self.__parse_update(msg.params[1])
174             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
175                   and self._monitor_request_id is not None
176                   and self._monitor_request_id == msg.id):
177                 # Reply to our "monitor" request.
178                 try:
179                     self.change_seqno += 1
180                     self._monitor_request_id = None
181                     self.__clear()
182                     self.__parse_update(msg.result)
183                 except error.Error, e:
184                     vlog.err("%s: parse error in received schema: %s"
185                               % (self._session.get_name(), e))
186                     self.__error()
187             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188                   and self._lock_request_id is not None
189                   and self._lock_request_id == msg.id):
190                 # Reply to our "lock" request.
191                 self.__parse_lock_reply(msg.result)
192             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193                   and msg.method == "locked"):
194                 # We got our lock.
195                 self.__parse_lock_notify(msg.params, True)
196             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
197                   and msg.method == "stolen"):
198                 # Someone else stole our lock.
199                 self.__parse_lock_notify(msg.params, False)
200             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
201                 # Reply to our echo request.  Ignore it.
202                 pass
203             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
204                                ovs.jsonrpc.Message.T_REPLY)
205                   and self.__txn_process_reply(msg)):
206                 # __txn_process_reply() did everything needed.
207                 pass
208             else:
209                 # This can happen if a transaction is destroyed before we
210                 # receive the reply, so keep the log level low.
211                 vlog.dbg("%s: received unexpected %s message"
212                          % (self._session.get_name(),
213                              ovs.jsonrpc.Message.type_to_string(msg.type)))
214
215         return initial_change_seqno != self.change_seqno
216
217     def wait(self, poller):
218         """Arranges for poller.block() to wake up when self.run() has something
219         to do or when activity occurs on a transaction on 'self'."""
220         self._session.wait(poller)
221         self._session.recv_wait(poller)
222
223     def has_ever_connected(self):
224         """Returns True, if the IDL successfully connected to the remote
225         database and retrieved its contents (even if the connection
226         subsequently dropped and is in the process of reconnecting).  If so,
227         then the IDL contains an atomic snapshot of the database's contents
228         (but it might be arbitrarily old if the connection dropped).
229
230         Returns False if the IDL has never connected or retrieved the
231         database's contents.  If so, the IDL is empty."""
232         return self.change_seqno != 0
233
234     def force_reconnect(self):
235         """Forces the IDL to drop its connection to the database and reconnect.
236         In the meantime, the contents of the IDL will not change."""
237         self._session.force_reconnect()
238
239     def set_lock(self, lock_name):
240         """If 'lock_name' is not None, configures the IDL to obtain the named
241         lock from the database server and to avoid modifying the database when
242         the lock cannot be acquired (that is, when another client has the same
243         lock).
244
245         If 'lock_name' is None, drops the locking requirement and releases the
246         lock."""
247         assert not self.txn
248         assert not self._outstanding_txns
249
250         if self.lock_name and (not lock_name or lock_name != self.lock_name):
251             # Release previous lock.
252             self.__send_unlock_request()
253             self.lock_name = None
254             self.is_lock_contended = False
255
256         if lock_name and not self.lock_name:
257             # Acquire new lock.
258             self.lock_name = lock_name
259             self.__send_lock_request()
260
261     def __clear(self):
262         changed = False
263
264         for table in self.tables.itervalues():
265             if table.rows:
266                 changed = True
267                 table.rows = {}
268
269         if changed:
270             self.change_seqno += 1
271
272     def __update_has_lock(self, new_has_lock):
273         if new_has_lock and not self.has_lock:
274             if self._monitor_request_id is None:
275                 self.change_seqno += 1
276             else:
277                 # We're waiting for a monitor reply, so don't signal that the
278                 # database changed.  The monitor reply will increment
279                 # change_seqno anyhow.
280                 pass
281             self.is_lock_contended = False
282         self.has_lock = new_has_lock
283
284     def __do_send_lock_request(self, method):
285         self.__update_has_lock(False)
286         self._lock_request_id = None
287         if self._session.is_connected():
288             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
289             msg_id = msg.id
290             self._session.send(msg)
291         else:
292             msg_id = None
293         return msg_id
294
295     def __send_lock_request(self):
296         self._lock_request_id = self.__do_send_lock_request("lock")
297
298     def __send_unlock_request(self):
299         self.__do_send_lock_request("unlock")
300
301     def __parse_lock_reply(self, result):
302         self._lock_request_id = None
303         got_lock = type(result) == dict and result.get("locked") is True
304         self.__update_has_lock(got_lock)
305         if not got_lock:
306             self.is_lock_contended = True
307
308     def __parse_lock_notify(self, params, new_has_lock):
309         if (self.lock_name is not None
310             and type(params) in (list, tuple)
311             and params
312             and params[0] == self.lock_name):
313             self.__update_has_lock(self, new_has_lock)
314             if not new_has_lock:
315                 self.is_lock_contended = True
316
317     def __send_monitor_request(self):
318         monitor_requests = {}
319         for table in self.tables.itervalues():
320             monitor_requests[table.name] = {"columns": table.columns.keys()}
321         msg = ovs.jsonrpc.Message.create_request(
322             "monitor", [self._db.name, None, monitor_requests])
323         self._monitor_request_id = msg.id
324         self._session.send(msg)
325
326     def __parse_update(self, update):
327         try:
328             self.__do_parse_update(update)
329         except error.Error, e:
330             vlog.err("%s: error parsing update: %s"
331                      % (self._session.get_name(), e))
332
333     def __do_parse_update(self, table_updates):
334         if type(table_updates) != dict:
335             raise error.Error("<table-updates> is not an object",
336                               table_updates)
337
338         for table_name, table_update in table_updates.iteritems():
339             table = self.tables.get(table_name)
340             if not table:
341                 raise error.Error('<table-updates> includes unknown '
342                                   'table "%s"' % table_name)
343
344             if type(table_update) != dict:
345                 raise error.Error('<table-update> for table "%s" is not '
346                                   'an object' % table_name, table_update)
347
348             for uuid_string, row_update in table_update.iteritems():
349                 if not ovs.ovsuuid.is_valid_string(uuid_string):
350                     raise error.Error('<table-update> for table "%s" '
351                                       'contains bad UUID "%s" as member '
352                                       'name' % (table_name, uuid_string),
353                                       table_update)
354                 uuid = ovs.ovsuuid.from_string(uuid_string)
355
356                 if type(row_update) != dict:
357                     raise error.Error('<table-update> for table "%s" '
358                                       'contains <row-update> for %s that '
359                                       'is not an object'
360                                       % (table_name, uuid_string))
361
362                 parser = ovs.db.parser.Parser(row_update, "row-update")
363                 old = parser.get_optional("old", [dict])
364                 new = parser.get_optional("new", [dict])
365                 parser.finish()
366
367                 if not old and not new:
368                     raise error.Error('<row-update> missing "old" and '
369                                       '"new" members', row_update)
370
371                 if self.__process_update(table, uuid, old, new):
372                     self.change_seqno += 1
373
374     def __process_update(self, table, uuid, old, new):
375         """Returns True if a column changed, False otherwise."""
376         row = table.rows.get(uuid)
377         changed = False
378         if not new:
379             # Delete row.
380             if row:
381                 del table.rows[uuid]
382                 changed = True
383             else:
384                 # XXX rate-limit
385                 vlog.warn("cannot delete missing row %s from table %s"
386                           % (uuid, table.name))
387         elif not old:
388             # Insert row.
389             if not row:
390                 row = self.__create_row(table, uuid)
391                 changed = True
392             else:
393                 # XXX rate-limit
394                 vlog.warn("cannot add existing row %s to table %s"
395                           % (uuid, table.name))
396             if self.__row_update(table, row, new):
397                 changed = True
398         else:
399             if not row:
400                 row = self.__create_row(table, uuid)
401                 changed = True
402                 # XXX rate-limit
403                 vlog.warn("cannot modify missing row %s in table %s"
404                           % (uuid, table.name))
405             if self.__row_update(table, row, new):
406                 changed = True
407         return changed
408
409     def __row_update(self, table, row, row_json):
410         changed = False
411         for column_name, datum_json in row_json.iteritems():
412             column = table.columns.get(column_name)
413             if not column:
414                 # XXX rate-limit
415                 vlog.warn("unknown column %s updating table %s"
416                           % (column_name, table.name))
417                 continue
418
419             try:
420                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
421             except error.Error, e:
422                 # XXX rate-limit
423                 vlog.warn("error parsing column %s in table %s: %s"
424                           % (column_name, table.name, e))
425                 continue
426
427             if datum != row._data[column_name]:
428                 row._data[column_name] = datum
429                 if column.alert:
430                     changed = True
431             else:
432                 # Didn't really change but the OVSDB monitor protocol always
433                 # includes every value in a row.
434                 pass
435         return changed
436
437     def __create_row(self, table, uuid):
438         data = {}
439         for column in table.columns.itervalues():
440             data[column.name] = ovs.db.data.Datum.default(column.type)
441         row = table.rows[uuid] = Row(self, table, uuid, data)
442         return row
443
444     def __error(self):
445         self._session.force_reconnect()
446
447     def __txn_abort_all(self):
448         while self._outstanding_txns:
449             txn = self._outstanding_txns.popitem()[1]
450             txn._status = Transaction.TRY_AGAIN
451
452     def __txn_process_reply(self, msg):
453         txn = self._outstanding_txns.pop(msg.id, None)
454         if txn:
455             txn._process_reply(msg)
456
457
458 def _uuid_to_row(atom, base):
459     if base.ref_table:
460         return base.ref_table.rows.get(atom)
461     else:
462         return atom
463
464
465 def _row_to_uuid(value):
466     if type(value) == Row:
467         return value.uuid
468     else:
469         return value
470
471
472 class Row(object):
473     """A row within an IDL.
474
475     The client may access the following attributes directly:
476
477     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
478
479     - An attribute for each column in the Row's table, named for the column,
480       whose values are as returned by Datum.to_python() for the column's type.
481
482       If some error occurs (e.g. the database server's idea of the column is
483       different from the IDL's idea), then the attribute values is the
484       "default" value return by Datum.default() for the column's type.  (It is
485       important to know this because the default value may violate constraints
486       for the column's type, e.g. the default integer value is 0 even if column
487       contraints require the column's value to be positive.)
488
489       When a transaction is active, column attributes may also be assigned new
490       values.  Committing the transaction will then cause the new value to be
491       stored into the database.
492
493       *NOTE*: In the current implementation, the value of a column is a *copy*
494       of the value in the database.  This means that modifying its value
495       directly will have no useful effect.  For example, the following:
496         row.mycolumn["a"] = "b"              # don't do this
497       will not change anything in the database, even after commit.  To modify
498       the column, instead assign the modified column value back to the column:
499         d = row.mycolumn
500         d["a"] = "b"
501         row.mycolumn = d
502 """
503     def __init__(self, idl, table, uuid, data):
504         # All of the explicit references to self.__dict__ below are required
505         # to set real attributes with invoking self.__getattr__().
506         self.__dict__["uuid"] = uuid
507
508         self.__dict__["_idl"] = idl
509         self.__dict__["_table"] = table
510
511         # _data is the committed data.  It takes the following values:
512         #
513         #   - A dictionary that maps every column name to a Datum, if the row
514         #     exists in the committed form of the database.
515         #
516         #   - None, if this row is newly inserted within the active transaction
517         #     and thus has no committed form.
518         self.__dict__["_data"] = data
519
520         # _changes describes changes to this row within the active transaction.
521         # It takes the following values:
522         #
523         #   - {}, the empty dictionary, if no transaction is active or if the
524         #     row has yet not been changed within this transaction.
525         #
526         #   - A dictionary that maps a column name to its new Datum, if an
527         #     active transaction changes those columns' values.
528         #
529         #   - A dictionary that maps every column name to a Datum, if the row
530         #     is newly inserted within the active transaction.
531         #
532         #   - None, if this transaction deletes this row.
533         self.__dict__["_changes"] = {}
534
535         # A dictionary whose keys are the names of columns that must be
536         # verified as prerequisites when the transaction commits.  The values
537         # in the dictionary are all None.
538         self.__dict__["_prereqs"] = {}
539
540     def __getattr__(self, column_name):
541         assert self._changes is not None
542
543         datum = self._changes.get(column_name)
544         if datum is None:
545             datum = self._data[column_name]
546
547         return datum.to_python(_uuid_to_row)
548
549     def __setattr__(self, column_name, value):
550         assert self._changes is not None
551         assert self._idl.txn
552
553         column = self._table.columns[column_name]
554         try:
555             datum = ovs.db.data.Datum.from_python(column.type, value,
556                                                   _row_to_uuid)
557         except error.Error, e:
558             # XXX rate-limit
559             vlog.err("attempting to write bad value to column %s (%s)"
560                      % (column_name, e))
561             return
562         self._idl.txn._write(self, column, datum)
563
564     def verify(self, column_name):
565         """Causes the original contents of column 'column_name' in this row to
566         be verified as a prerequisite to completing the transaction.  That is,
567         if 'column_name' changed in this row (or if this row was deleted)
568         between the time that the IDL originally read its contents and the time
569         that the transaction commits, then the transaction aborts and
570         Transaction.commit() returns Transaction.TRY_AGAIN.
571
572         The intention is that, to ensure that no transaction commits based on
573         dirty reads, an application should call Row.verify() on each data item
574         read as part of a read-modify-write operation.
575
576         In some cases Row.verify() reduces to a no-op, because the current
577         value of the column is already known:
578
579           - If this row is a row created by the current transaction (returned
580             by Transaction.insert()).
581
582           - If the column has already been modified within the current
583             transaction.
584
585         Because of the latter property, always call Row.verify() *before*
586         modifying the column, for a given read-modify-write.
587
588         A transaction must be in progress."""
589         assert self._idl.txn
590         assert self._changes is not None
591         if not self._data or column_name in self._changes:
592             return
593
594         self._prereqs[column_name] = None
595
596     def delete(self):
597         """Deletes this row from its table.
598
599         A transaction must be in progress."""
600         assert self._idl.txn
601         assert self._changes is not None
602         if self._data is None:
603             del self._idl.txn._txn_rows[self.uuid]
604         self.__dict__["_changes"] = None
605         del self._table.rows[self.uuid]
606
607     def increment(self, column_name):
608         """Causes the transaction, when committed, to increment the value of
609         'column_name' within this row by 1.  'column_name' must have an integer
610         type.  After the transaction commits successfully, the client may
611         retrieve the final (incremented) value of 'column_name' with
612         Transaction.get_increment_new_value().
613
614         The client could accomplish something similar by reading and writing
615         and verify()ing columns.  However, increment() will never (by itself)
616         cause a transaction to fail because of a verify error.
617
618         The intended use is for incrementing the "next_cfg" column in
619         the Open_vSwitch table."""
620         self._idl.txn._increment(self, column_name)
621
622
623 def _uuid_name_from_uuid(uuid):
624     return "row%s" % str(uuid).replace("-", "_")
625
626
627 def _where_uuid_equals(uuid):
628     return [["_uuid", "==", ["uuid", str(uuid)]]]
629
630
631 class _InsertedRow(object):
632     def __init__(self, op_index):
633         self.op_index = op_index
634         self.real = None
635
636
637 class Transaction(object):
638     # Status values that Transaction.commit() can return.
639     UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
640     UNCHANGED = "unchanged"      # Transaction didn't include any changes.
641     INCOMPLETE = "incomplete"    # Commit in progress, please wait.
642     ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
643     SUCCESS = "success"          # Commit successful.
644     TRY_AGAIN = "try again"      # Commit failed because a "verify" operation
645                                  # reported an inconsistency, due to a network
646                                  # problem, or other transient failure.  Wait
647                                  # for a change, then try again.
648     NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
649     ERROR = "error"              # Commit failed due to a hard error.
650
651     @staticmethod
652     def status_to_string(status):
653         """Converts one of the status values that Transaction.commit() can
654         return into a human-readable string.
655
656         (The status values are in fact such strings already, so
657         there's nothing to do.)"""
658         return status
659
660     def __init__(self, idl):
661         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
662         A given Idl may only have a single active transaction at a time.
663
664         A Transaction may modify the contents of a database by assigning new
665         values to columns (attributes of Row), deleting rows (with
666         Row.delete()), or inserting rows (with Transaction.insert()).  It may
667         also check that columns in the database have not changed with
668         Row.verify().
669
670         When a transaction is complete (which must be before the next call to
671         Idl.run()), call Transaction.commit() or Transaction.abort()."""
672         assert idl.txn is None
673
674         idl.txn = self
675         self._request_id = None
676         self.idl = idl
677         self.dry_run = False
678         self._txn_rows = {}
679         self._status = Transaction.UNCOMMITTED
680         self._error = None
681         self._comments = []
682         self._commit_seqno = self.idl.change_seqno
683
684         self._inc_row = None
685         self._inc_column = None
686
687         self._inserted_rows = {}  # Map from UUID to _InsertedRow
688
689     def add_comment(self, comment):
690         """Appens 'comment' to the comments that will be passed to the OVSDB
691         server when this transaction is committed.  (The comment will be
692         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
693         relatively human-readable form.)"""
694         self._comments.append(comment)
695
696     def wait(self, poller):
697         if self._status not in (Transaction.UNCOMMITTED,
698                                 Transaction.INCOMPLETE):
699             poller.immediate_wake()
700
701     def _substitute_uuids(self, json):
702         if type(json) in (list, tuple):
703             if (len(json) == 2
704                 and json[0] == 'uuid'
705                 and ovs.ovsuuid.is_valid_string(json[1])):
706                 uuid = ovs.ovsuuid.from_string(json[1])
707                 row = self._txn_rows.get(uuid, None)
708                 if row and row._data is None:
709                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
710         return json
711
712     def __disassemble(self):
713         self.idl.txn = None
714
715         for row in self._txn_rows.itervalues():
716             if row._changes is None:
717                 row._table.rows[row.uuid] = row
718             elif row._data is None:
719                 del row._table.rows[row.uuid]
720             row.__dict__["_changes"] = {}
721             row.__dict__["_prereqs"] = {}
722         self._txn_rows = {}
723
724     def commit(self):
725         """Attempts to commit this transaction and returns the status of the
726         commit operation, one of the constants declared as class attributes.
727         If the return value is Transaction.INCOMPLETE, then the transaction is
728         not yet complete and the caller should try calling again later, after
729         calling Idl.run() to run the Idl.
730
731         Committing a transaction rolls back all of the changes that it made to
732         the Idl's copy of the database.  If the transaction commits
733         successfully, then the database server will send an update and, thus,
734         the Idl will be updated with the committed changes."""
735         # The status can only change if we're the active transaction.
736         # (Otherwise, our status will change only in Idl.run().)
737         if self != self.idl.txn:
738             return self._status
739
740         # If we need a lock but don't have it, give up quickly.
741         if self.idl.lock_name and not self.idl.has_lock():
742             self._status = Transaction.NOT_LOCKED
743             self.__disassemble()
744             return self._status
745
746         operations = [self.idl._db.name]
747
748         # Assert that we have the required lock (avoiding a race).
749         if self.idl.lock_name:
750             operations.append({"op": "assert",
751                                "lock": self.idl.lock_name})
752
753         # Add prerequisites and declarations of new rows.
754         for row in self._txn_rows.itervalues():
755             if row._prereqs:
756                 rows = {}
757                 columns = []
758                 for column_name in row._prereqs:
759                     columns.append(column_name)
760                     rows[column_name] = row._data[column_name].to_json()
761                 operations.append({"op": "wait",
762                                    "table": row._table.name,
763                                    "timeout": 0,
764                                    "where": _where_uuid_equals(row.uuid),
765                                    "until": "==",
766                                    "columns": columns,
767                                    "rows": [rows]})
768
769         # Add updates.
770         any_updates = False
771         for row in self._txn_rows.itervalues():
772             if row._changes is None:
773                 if row._table.is_root:
774                     operations.append({"op": "delete",
775                                        "table": row._table.name,
776                                        "where": _where_uuid_equals(row.uuid)})
777                     any_updates = True
778                 else:
779                     # Let ovsdb-server decide whether to really delete it.
780                     pass
781             elif row._changes:
782                 op = {"table": row._table.name}
783                 if row._data is None:
784                     op["op"] = "insert"
785                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
786                     any_updates = True
787
788                     op_index = len(operations) - 1
789                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
790                 else:
791                     op["op"] = "update"
792                     op["where"] = _where_uuid_equals(row.uuid)
793
794                 row_json = {}
795                 op["row"] = row_json
796
797                 for column_name, datum in row._changes.iteritems():
798                     if row._data is not None or not datum.is_default():
799                         row_json[column_name] = (
800                                 self._substitute_uuids(datum.to_json()))
801
802                         # If anything really changed, consider it an update.
803                         # We can't suppress not-really-changed values earlier
804                         # or transactions would become nonatomic (see the big
805                         # comment inside Transaction._write()).
806                         if (not any_updates and row._data is not None and
807                             row._data[column_name] != datum):
808                             any_updates = True
809
810                 if row._data is None or row_json:
811                     operations.append(op)
812
813         # Add increment.
814         if self._inc_row and any_updates:
815             self._inc_index = len(operations) - 1
816
817             operations.append({"op": "mutate",
818                                "table": self._inc_row._table.name,
819                                "where": self._substitute_uuids(
820                                    _where_uuid_equals(self._inc_row.uuid)),
821                                "mutations": [[self._inc_column, "+=", 1]]})
822             operations.append({"op": "select",
823                                "table": self._inc_row._table.name,
824                                "where": self._substitute_uuids(
825                                    _where_uuid_equals(self._inc_row.uuid)),
826                                "columns": [self._inc_column]})
827
828         # Add comment.
829         if self._comments:
830             operations.append({"op": "comment",
831                                "comment": "\n".join(self._comments)})
832
833         # Dry run?
834         if self.dry_run:
835             operations.append({"op": "abort"})
836
837         if not any_updates:
838             self._status = Transaction.UNCHANGED
839         else:
840             msg = ovs.jsonrpc.Message.create_request("transact", operations)
841             self._request_id = msg.id
842             if not self.idl._session.send(msg):
843                 self.idl._outstanding_txns[self._request_id] = self
844                 self._status = Transaction.INCOMPLETE
845             else:
846                 self._status = Transaction.TRY_AGAIN
847
848         self.__disassemble()
849         return self._status
850
851     def commit_block(self):
852         while True:
853             status = self.commit()
854             if status != Transaction.INCOMPLETE:
855                 return status
856
857             self.idl.run()
858
859             poller = ovs.poller.Poller()
860             self.idl.wait(poller)
861             self.wait(poller)
862             poller.block()
863
864     def get_increment_new_value(self):
865         assert self._status == Transaction.SUCCESS
866         return self._inc_new_value
867
868     def abort(self):
869         """Aborts this transaction.  If Transaction.commit() has already been
870         called then the transaction might get committed anyhow."""
871         self.__disassemble()
872         if self._status in (Transaction.UNCOMMITTED,
873                             Transaction.INCOMPLETE):
874             self._status = Transaction.ABORTED
875
876     def get_error(self):
877         """Returns a string representing this transaction's current status,
878         suitable for use in log messages."""
879         if self._status != Transaction.ERROR:
880             return Transaction.status_to_string(self._status)
881         elif self._error:
882             return self._error
883         else:
884             return "no error details available"
885
886     def __set_error_json(self, json):
887         if self._error is None:
888             self._error = ovs.json.to_string(json)
889
890     def get_insert_uuid(self, uuid):
891         """Finds and returns the permanent UUID that the database assigned to a
892         newly inserted row, given the UUID that Transaction.insert() assigned
893         locally to that row.
894
895         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
896         or if it was assigned by that function and then deleted by Row.delete()
897         within the same transaction.  (Rows that are inserted and then deleted
898         within a single transaction are never sent to the database server, so
899         it never assigns them a permanent UUID.)
900
901         This transaction must have completed successfully."""
902         assert self._status in (Transaction.SUCCESS,
903                                 Transaction.UNCHANGED)
904         inserted_row = self._inserted_rows.get(uuid)
905         if inserted_row:
906             return inserted_row.real
907         return None
908
909     def _increment(self, row, column):
910         assert not self._inc_row
911         self._inc_row = row
912         self._inc_column = column
913
914     def _write(self, row, column, datum):
915         assert row._changes is not None
916
917         txn = row._idl.txn
918
919         # If this is a write-only column and the datum being written is the
920         # same as the one already there, just skip the update entirely.  This
921         # is worth optimizing because we have a lot of columns that get
922         # periodically refreshed into the database but don't actually change
923         # that often.
924         #
925         # We don't do this for read/write columns because that would break
926         # atomicity of transactions--some other client might have written a
927         # different value in that column since we read it.  (But if a whole
928         # transaction only does writes of existing values, without making any
929         # real changes, we will drop the whole transaction later in
930         # ovsdb_idl_txn_commit().)
931         if not column.alert and row._data.get(column.name) == datum:
932             new_value = row._changes.get(column.name)
933             if new_value is None or new_value == datum:
934                 return
935
936         txn._txn_rows[row.uuid] = row
937         row._changes[column.name] = datum.copy()
938
939     def insert(self, table, new_uuid=None):
940         """Inserts and returns a new row in 'table', which must be one of the
941         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
942
943         The new row is assigned a provisional UUID.  If 'uuid' is None then one
944         is randomly generated; otherwise 'uuid' should specify a randomly
945         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
946         different UUID when 'txn' is committed, but the IDL will replace any
947         uses of the provisional UUID in the data to be to be committed by the
948         UUID assigned by ovsdb-server."""
949         assert self._status == Transaction.UNCOMMITTED
950         if new_uuid is None:
951             new_uuid = uuid.uuid4()
952         row = Row(self.idl, table, new_uuid, None)
953         table.rows[row.uuid] = row
954         self._txn_rows[row.uuid] = row
955         return row
956
957     def _process_reply(self, msg):
958         if msg.type == ovs.jsonrpc.Message.T_ERROR:
959             self._status = Transaction.ERROR
960         elif type(msg.result) not in (list, tuple):
961             # XXX rate-limit
962             vlog.warn('reply to "transact" is not JSON array')
963         else:
964             hard_errors = False
965             soft_errors = False
966             lock_errors = False
967
968             ops = msg.result
969             for op in ops:
970                 if op is None:
971                     # This isn't an error in itself but indicates that some
972                     # prior operation failed, so make sure that we know about
973                     # it.
974                     soft_errors = True
975                 elif type(op) == dict:
976                     error = op.get("error")
977                     if error is not None:
978                         if error == "timed out":
979                             soft_errors = True
980                         elif error == "not owner":
981                             lock_errors = True
982                         elif error == "aborted":
983                             pass
984                         else:
985                             hard_errors = True
986                             self.__set_error_json(op)
987                 else:
988                     hard_errors = True
989                     self.__set_error_json(op)
990                     # XXX rate-limit
991                     vlog.warn("operation reply is not JSON null or object")
992
993             if not soft_errors and not hard_errors and not lock_errors:
994                 if self._inc_row and not self.__process_inc_reply(ops):
995                     hard_errors = True
996
997                 for insert in self._inserted_rows.itervalues():
998                     if not self.__process_insert_reply(insert, ops):
999                         hard_errors = True
1000
1001             if hard_errors:
1002                 self._status = Transaction.ERROR
1003             elif lock_errors:
1004                 self._status = Transaction.NOT_LOCKED
1005             elif soft_errors:
1006                 self._status = Transaction.TRY_AGAIN
1007             else:
1008                 self._status = Transaction.SUCCESS
1009
1010     @staticmethod
1011     def __check_json_type(json, types, name):
1012         if not json:
1013             # XXX rate-limit
1014             vlog.warn("%s is missing" % name)
1015             return False
1016         elif type(json) not in types:
1017             # XXX rate-limit
1018             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1019             return False
1020         else:
1021             return True
1022
1023     def __process_inc_reply(self, ops):
1024         if self._inc_index + 2 > len(ops):
1025             # XXX rate-limit
1026             vlog.warn("reply does not contain enough operations for "
1027                       "increment (has %d, needs %d)" %
1028                       (len(ops), self._inc_index + 2))
1029
1030         # We know that this is a JSON object because the loop in
1031         # __process_reply() already checked.
1032         mutate = ops[self._inc_index]
1033         count = mutate.get("count")
1034         if not Transaction.__check_json_type(count, (int, long),
1035                                              '"mutate" reply "count"'):
1036             return False
1037         if count != 1:
1038             # XXX rate-limit
1039             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1040             return False
1041
1042         select = ops[self._inc_index + 1]
1043         rows = select.get("rows")
1044         if not Transaction.__check_json_type(rows, (list, tuple),
1045                                              '"select" reply "rows"'):
1046             return False
1047         if len(rows) != 1:
1048             # XXX rate-limit
1049             vlog.warn('"select" reply "rows" has %d elements '
1050                       'instead of 1' % len(rows))
1051             return False
1052         row = rows[0]
1053         if not Transaction.__check_json_type(row, (dict,),
1054                                              '"select" reply row'):
1055             return False
1056         column = row.get(self._inc_column)
1057         if not Transaction.__check_json_type(column, (int, long),
1058                                              '"select" reply inc column'):
1059             return False
1060         self._inc_new_value = column
1061         return True
1062
1063     def __process_insert_reply(self, insert, ops):
1064         if insert.op_index >= len(ops):
1065             # XXX rate-limit
1066             vlog.warn("reply does not contain enough operations "
1067                       "for insert (has %d, needs %d)"
1068                       % (len(ops), insert.op_index))
1069             return False
1070
1071         # We know that this is a JSON object because the loop in
1072         # __process_reply() already checked.
1073         reply = ops[insert.op_index]
1074         json_uuid = reply.get("uuid")
1075         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1076                                              '"insert" reply "uuid"'):
1077             return False
1078
1079         try:
1080             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1081         except error.Error:
1082             # XXX rate-limit
1083             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1084             return False
1085
1086         insert.real = uuid_
1087         return True
1088
1089
1090 class SchemaHelper(object):
1091     """IDL Schema helper.
1092
1093     This class encapsulates the logic required to generate schemas suitable
1094     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1095     they are interested in using register_columns().  When finished, the
1096     get_idl_schema() function may be called.
1097
1098     The location on disk of the schema used may be found in the
1099     'schema_location' variable."""
1100
1101     def __init__(self, location=None):
1102         """Creates a new Schema object."""
1103
1104         if location is None:
1105             location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1106
1107         self.schema_location = location
1108         self._tables = {}
1109         self._all = False
1110
1111     def register_columns(self, table, columns):
1112         """Registers interest in the given 'columns' of 'table'.  Future calls
1113         to get_idl_schema() will include 'table':column for each column in
1114         'columns'. This function automatically avoids adding duplicate entries
1115         to the schema.
1116
1117         'table' must be a string.
1118         'columns' must be a list of strings.
1119         """
1120
1121         assert type(table) is str
1122         assert type(columns) is list
1123
1124         columns = set(columns) | self._tables.get(table, set())
1125         self._tables[table] = columns
1126
1127     def register_all(self):
1128         """Registers interest in every column of every table."""
1129         self._all = True
1130
1131     def get_idl_schema(self):
1132         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1133         object based on columns registered using the register_columns()
1134         function."""
1135
1136         schema = ovs.db.schema.DbSchema.from_json(
1137             ovs.json.from_file(self.schema_location))
1138
1139         if not self._all:
1140             schema_tables = {}
1141             for table, columns in self._tables.iteritems():
1142                 schema_tables[table] = (
1143                     self._keep_table_columns(schema, table, columns))
1144
1145             schema.tables = schema_tables
1146         return schema
1147
1148     def _keep_table_columns(self, schema, table_name, columns):
1149         assert table_name in schema.tables
1150         table = schema.tables[table_name]
1151
1152         new_columns = {}
1153         for column_name in columns:
1154             assert type(column_name) is str
1155             assert column_name in table.columns
1156
1157             new_columns[column_name] = table.columns[column_name]
1158
1159         table.columns = new_columns
1160         return table