idl: Convert python daemons to utilize SchemaHelper.
[sliver-openvswitch.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29
30 class Idl:
31     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
32
33     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
34     requests to an OVSDB database server and parses the responses, converting
35     raw JSON into data structures that are easier for clients to digest.
36
37     The IDL also assists with issuing database transactions.  The client
38     creates a transaction, manipulates the IDL data structures, and commits or
39     aborts the transaction.  The IDL then composes and issues the necessary
40     JSON-RPC requests and reports to the client whether the transaction
41     completed successfully.
42
43     The client is allowed to access the following attributes directly, in a
44     read-only fashion:
45
46     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
48       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
49       to a Row object.
50
51       The client may directly read and write the Row objects referenced by the
52       'rows' map values.  Refer to Row for more details.
53
54     - 'change_seqno': A number that represents the IDL's state.  When the IDL
55       is updated (by Idl.run()), its value changes.
56
57     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58       if no lock is configured.
59
60     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61       lock, and False otherwise.
62
63       Locking and unlocking happens asynchronously from the database client's
64       point of view, so the information is only useful for optimization
65       (e.g. if the client doesn't have the lock then there's no point in trying
66       to write to the database).
67
68     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69       the database server has indicated that some other client already owns the
70       requested lock, and False otherwise.
71
72     - 'txn': The ovs.db.idl.Transaction object for the database transaction
73       currently being constructed, if there is one, or None otherwise.
74 """
75
76     def __init__(self, remote, schema):
77         """Creates and returns a connection to the database named 'db_name' on
78         'remote', which should be in a form acceptable to
79         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
80         replica of the remote database.
81
82         'schema' should be the schema for the remote database.  The caller may
83         have cut it down by removing tables or columns that are not of
84         interest.  The IDL will only replicate the tables and columns that
85         remain.  The caller may also add a attribute named 'alert' to selected
86         remaining columns, setting its value to False; if so, then changes to
87         those columns will not be considered changes to the database for the
88         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
89         useful for columns that the IDL's client will write but not read.
90
91         As a convenience to users, 'schema' may also be an instance of the
92         SchemaHelper class.
93
94         The IDL uses and modifies 'schema' directly."""
95
96         assert isinstance(schema, SchemaHelper)
97         schema = schema.get_idl_schema()
98
99         self.tables = schema.tables
100         self._db = schema
101         self._session = ovs.jsonrpc.Session.open(remote)
102         self._monitor_request_id = None
103         self._last_seqno = None
104         self.change_seqno = 0
105
106         # Database locking.
107         self.lock_name = None          # Name of lock we need, None if none.
108         self.has_lock = False          # Has db server said we have the lock?
109         self.is_lock_contended = False  # Has db server said we can't get lock?
110         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
111
112         # Transaction support.
113         self.txn = None
114         self._outstanding_txns = {}
115
116         for table in schema.tables.itervalues():
117             for column in table.columns.itervalues():
118                 if not hasattr(column, 'alert'):
119                     column.alert = True
120             table.need_table = False
121             table.rows = {}
122             table.idl = self
123
124     def close(self):
125         """Closes the connection to the database.  The IDL will no longer
126         update."""
127         self._session.close()
128
129     def run(self):
130         """Processes a batch of messages from the database server.  Returns
131         True if the database as seen through the IDL changed, False if it did
132         not change.  The initial fetch of the entire contents of the remote
133         database is considered to be one kind of change.  If the IDL has been
134         configured to acquire a database lock (with Idl.set_lock()), then
135         successfully acquiring the lock is also considered to be a change.
136
137         This function can return occasional false positives, that is, report
138         that the database changed even though it didn't.  This happens if the
139         connection to the database drops and reconnects, which causes the
140         database contents to be reloaded even if they didn't change.  (It could
141         also happen if the database server sends out a "change" that reflects
142         what we already thought was in the database, but the database server is
143         not supposed to do that.)
144
145         As an alternative to checking the return value, the client may check
146         for changes in self.change_seqno."""
147         assert not self.txn
148         initial_change_seqno = self.change_seqno
149         self._session.run()
150         i = 0
151         while i < 50:
152             i += 1
153             if not self._session.is_connected():
154                 break
155
156             seqno = self._session.get_seqno()
157             if seqno != self._last_seqno:
158                 self._last_seqno = seqno
159                 self.__txn_abort_all()
160                 self.__send_monitor_request()
161                 if self.lock_name:
162                     self.__send_lock_request()
163                 break
164
165             msg = self._session.recv()
166             if msg is None:
167                 break
168             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
169                 and msg.method == "update"
170                 and len(msg.params) == 2
171                 and msg.params[0] == None):
172                 # Database contents changed.
173                 self.__parse_update(msg.params[1])
174             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
175                   and self._monitor_request_id is not None
176                   and self._monitor_request_id == msg.id):
177                 # Reply to our "monitor" request.
178                 try:
179                     self.change_seqno += 1
180                     self._monitor_request_id = None
181                     self.__clear()
182                     self.__parse_update(msg.result)
183                 except error.Error, e:
184                     vlog.err("%s: parse error in received schema: %s"
185                               % (self._session.get_name(), e))
186                     self.__error()
187             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188                   and self._lock_request_id is not None
189                   and self._lock_request_id == msg.id):
190                 # Reply to our "lock" request.
191                 self.__parse_lock_reply(msg.result)
192             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193                   and msg.method == "locked"):
194                 # We got our lock.
195                 self.__parse_lock_notify(msg.params, True)
196             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
197                   and msg.method == "stolen"):
198                 # Someone else stole our lock.
199                 self.__parse_lock_notify(msg.params, False)
200             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
201                 # Reply to our echo request.  Ignore it.
202                 pass
203             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
204                                ovs.jsonrpc.Message.T_REPLY)
205                   and self.__txn_process_reply(msg)):
206                 # __txn_process_reply() did everything needed.
207                 pass
208             else:
209                 # This can happen if a transaction is destroyed before we
210                 # receive the reply, so keep the log level low.
211                 vlog.dbg("%s: received unexpected %s message"
212                          % (self._session.get_name(),
213                              ovs.jsonrpc.Message.type_to_string(msg.type)))
214
215         return initial_change_seqno != self.change_seqno
216
217     def wait(self, poller):
218         """Arranges for poller.block() to wake up when self.run() has something
219         to do or when activity occurs on a transaction on 'self'."""
220         self._session.wait(poller)
221         self._session.recv_wait(poller)
222
223     def has_ever_connected(self):
224         """Returns True, if the IDL successfully connected to the remote
225         database and retrieved its contents (even if the connection
226         subsequently dropped and is in the process of reconnecting).  If so,
227         then the IDL contains an atomic snapshot of the database's contents
228         (but it might be arbitrarily old if the connection dropped).
229
230         Returns False if the IDL has never connected or retrieved the
231         database's contents.  If so, the IDL is empty."""
232         return self.change_seqno != 0
233
234     def force_reconnect(self):
235         """Forces the IDL to drop its connection to the database and reconnect.
236         In the meantime, the contents of the IDL will not change."""
237         self._session.force_reconnect()
238
239     def set_lock(self, lock_name):
240         """If 'lock_name' is not None, configures the IDL to obtain the named
241         lock from the database server and to avoid modifying the database when
242         the lock cannot be acquired (that is, when another client has the same
243         lock).
244
245         If 'lock_name' is None, drops the locking requirement and releases the
246         lock."""
247         assert not self.txn
248         assert not self._outstanding_txns
249
250         if self.lock_name and (not lock_name or lock_name != self.lock_name):
251             # Release previous lock.
252             self.__send_unlock_request()
253             self.lock_name = None
254             self.is_lock_contended = False
255
256         if lock_name and not self.lock_name:
257             # Acquire new lock.
258             self.lock_name = lock_name
259             self.__send_lock_request()
260
261     def __clear(self):
262         changed = False
263
264         for table in self.tables.itervalues():
265             if table.rows:
266                 changed = True
267                 table.rows = {}
268
269         if changed:
270             self.change_seqno += 1
271
272     def __update_has_lock(self, new_has_lock):
273         if new_has_lock and not self.has_lock:
274             if self._monitor_request_id is None:
275                 self.change_seqno += 1
276             else:
277                 # We're waiting for a monitor reply, so don't signal that the
278                 # database changed.  The monitor reply will increment
279                 # change_seqno anyhow.
280                 pass
281             self.is_lock_contended = False
282         self.has_lock = new_has_lock
283
284     def __do_send_lock_request(self, method):
285         self.__update_has_lock(False)
286         self._lock_request_id = None
287         if self._session.is_connected():
288             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
289             msg_id = msg.id
290             self._session.send(msg)
291         else:
292             msg_id = None
293         return msg_id
294
295     def __send_lock_request(self):
296         self._lock_request_id = self.__do_send_lock_request("lock")
297
298     def __send_unlock_request(self):
299         self.__do_send_lock_request("unlock")
300
301     def __parse_lock_reply(self, result):
302         self._lock_request_id = None
303         got_lock = type(result) == dict and result.get("locked") is True
304         self.__update_has_lock(got_lock)
305         if not got_lock:
306             self.is_lock_contended = True
307
308     def __parse_lock_notify(self, params, new_has_lock):
309         if (self.lock_name is not None
310             and type(params) in (list, tuple)
311             and params
312             and params[0] == self.lock_name):
313             self.__update_has_lock(self, new_has_lock)
314             if not new_has_lock:
315                 self.is_lock_contended = True
316
317     def __send_monitor_request(self):
318         monitor_requests = {}
319         for table in self.tables.itervalues():
320             monitor_requests[table.name] = {"columns": table.columns.keys()}
321         msg = ovs.jsonrpc.Message.create_request(
322             "monitor", [self._db.name, None, monitor_requests])
323         self._monitor_request_id = msg.id
324         self._session.send(msg)
325
326     def __parse_update(self, update):
327         try:
328             self.__do_parse_update(update)
329         except error.Error, e:
330             vlog.err("%s: error parsing update: %s"
331                      % (self._session.get_name(), e))
332
333     def __do_parse_update(self, table_updates):
334         if type(table_updates) != dict:
335             raise error.Error("<table-updates> is not an object",
336                               table_updates)
337
338         for table_name, table_update in table_updates.iteritems():
339             table = self.tables.get(table_name)
340             if not table:
341                 raise error.Error('<table-updates> includes unknown '
342                                   'table "%s"' % table_name)
343
344             if type(table_update) != dict:
345                 raise error.Error('<table-update> for table "%s" is not '
346                                   'an object' % table_name, table_update)
347
348             for uuid_string, row_update in table_update.iteritems():
349                 if not ovs.ovsuuid.is_valid_string(uuid_string):
350                     raise error.Error('<table-update> for table "%s" '
351                                       'contains bad UUID "%s" as member '
352                                       'name' % (table_name, uuid_string),
353                                       table_update)
354                 uuid = ovs.ovsuuid.from_string(uuid_string)
355
356                 if type(row_update) != dict:
357                     raise error.Error('<table-update> for table "%s" '
358                                       'contains <row-update> for %s that '
359                                       'is not an object'
360                                       % (table_name, uuid_string))
361
362                 parser = ovs.db.parser.Parser(row_update, "row-update")
363                 old = parser.get_optional("old", [dict])
364                 new = parser.get_optional("new", [dict])
365                 parser.finish()
366
367                 if not old and not new:
368                     raise error.Error('<row-update> missing "old" and '
369                                       '"new" members', row_update)
370
371                 if self.__process_update(table, uuid, old, new):
372                     self.change_seqno += 1
373
374     def __process_update(self, table, uuid, old, new):
375         """Returns True if a column changed, False otherwise."""
376         row = table.rows.get(uuid)
377         changed = False
378         if not new:
379             # Delete row.
380             if row:
381                 del table.rows[uuid]
382                 changed = True
383             else:
384                 # XXX rate-limit
385                 vlog.warn("cannot delete missing row %s from table %s"
386                           % (uuid, table.name))
387         elif not old:
388             # Insert row.
389             if not row:
390                 row = self.__create_row(table, uuid)
391                 changed = True
392             else:
393                 # XXX rate-limit
394                 vlog.warn("cannot add existing row %s to table %s"
395                           % (uuid, table.name))
396             if self.__row_update(table, row, new):
397                 changed = True
398         else:
399             if not row:
400                 row = self.__create_row(table, uuid)
401                 changed = True
402                 # XXX rate-limit
403                 vlog.warn("cannot modify missing row %s in table %s"
404                           % (uuid, table.name))
405             if self.__row_update(table, row, new):
406                 changed = True
407         return changed
408
409     def __row_update(self, table, row, row_json):
410         changed = False
411         for column_name, datum_json in row_json.iteritems():
412             column = table.columns.get(column_name)
413             if not column:
414                 # XXX rate-limit
415                 vlog.warn("unknown column %s updating table %s"
416                           % (column_name, table.name))
417                 continue
418
419             try:
420                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
421             except error.Error, e:
422                 # XXX rate-limit
423                 vlog.warn("error parsing column %s in table %s: %s"
424                           % (column_name, table.name, e))
425                 continue
426
427             if datum != row._data[column_name]:
428                 row._data[column_name] = datum
429                 if column.alert:
430                     changed = True
431             else:
432                 # Didn't really change but the OVSDB monitor protocol always
433                 # includes every value in a row.
434                 pass
435         return changed
436
437     def __create_row(self, table, uuid):
438         data = {}
439         for column in table.columns.itervalues():
440             data[column.name] = ovs.db.data.Datum.default(column.type)
441         row = table.rows[uuid] = Row(self, table, uuid, data)
442         return row
443
444     def __error(self):
445         self._session.force_reconnect()
446
447     def __txn_abort_all(self):
448         while self._outstanding_txns:
449             txn = self._outstanding_txns.popitem()[1]
450             txn._status = Transaction.AGAIN_WAIT
451
452     def __txn_process_reply(self, msg):
453         txn = self._outstanding_txns.pop(msg.id, None)
454         if txn:
455             txn._process_reply(msg)
456
457
458 def _uuid_to_row(atom, base):
459     if base.ref_table:
460         return base.ref_table.rows.get(atom)
461     else:
462         return atom
463
464
465 def _row_to_uuid(value):
466     if type(value) == Row:
467         return value.uuid
468     else:
469         return value
470
471
472 class Row(object):
473     """A row within an IDL.
474
475     The client may access the following attributes directly:
476
477     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
478
479     - An attribute for each column in the Row's table, named for the column,
480       whose values are as returned by Datum.to_python() for the column's type.
481
482       If some error occurs (e.g. the database server's idea of the column is
483       different from the IDL's idea), then the attribute values is the
484       "default" value return by Datum.default() for the column's type.  (It is
485       important to know this because the default value may violate constraints
486       for the column's type, e.g. the default integer value is 0 even if column
487       contraints require the column's value to be positive.)
488
489       When a transaction is active, column attributes may also be assigned new
490       values.  Committing the transaction will then cause the new value to be
491       stored into the database.
492
493       *NOTE*: In the current implementation, the value of a column is a *copy*
494       of the value in the database.  This means that modifying its value
495       directly will have no useful effect.  For example, the following:
496         row.mycolumn["a"] = "b"              # don't do this
497       will not change anything in the database, even after commit.  To modify
498       the column, instead assign the modified column value back to the column:
499         d = row.mycolumn
500         d["a"] = "b"
501         row.mycolumn = d
502 """
503     def __init__(self, idl, table, uuid, data):
504         # All of the explicit references to self.__dict__ below are required
505         # to set real attributes with invoking self.__getattr__().
506         self.__dict__["uuid"] = uuid
507
508         self.__dict__["_idl"] = idl
509         self.__dict__["_table"] = table
510
511         # _data is the committed data.  It takes the following values:
512         #
513         #   - A dictionary that maps every column name to a Datum, if the row
514         #     exists in the committed form of the database.
515         #
516         #   - None, if this row is newly inserted within the active transaction
517         #     and thus has no committed form.
518         self.__dict__["_data"] = data
519
520         # _changes describes changes to this row within the active transaction.
521         # It takes the following values:
522         #
523         #   - {}, the empty dictionary, if no transaction is active or if the
524         #     row has yet not been changed within this transaction.
525         #
526         #   - A dictionary that maps a column name to its new Datum, if an
527         #     active transaction changes those columns' values.
528         #
529         #   - A dictionary that maps every column name to a Datum, if the row
530         #     is newly inserted within the active transaction.
531         #
532         #   - None, if this transaction deletes this row.
533         self.__dict__["_changes"] = {}
534
535         # A dictionary whose keys are the names of columns that must be
536         # verified as prerequisites when the transaction commits.  The values
537         # in the dictionary are all None.
538         self.__dict__["_prereqs"] = {}
539
540     def __getattr__(self, column_name):
541         assert self._changes is not None
542
543         datum = self._changes.get(column_name)
544         if datum is None:
545             datum = self._data[column_name]
546
547         return datum.to_python(_uuid_to_row)
548
549     def __setattr__(self, column_name, value):
550         assert self._changes is not None
551         assert self._idl.txn
552
553         column = self._table.columns[column_name]
554         try:
555             datum = ovs.db.data.Datum.from_python(column.type, value,
556                                                   _row_to_uuid)
557         except error.Error, e:
558             # XXX rate-limit
559             vlog.err("attempting to write bad value to column %s (%s)"
560                      % (column_name, e))
561             return
562         self._idl.txn._write(self, column, datum)
563
564     def verify(self, column_name):
565         """Causes the original contents of column 'column_name' in this row to
566         be verified as a prerequisite to completing the transaction.  That is,
567         if 'column_name' changed in this row (or if this row was deleted)
568         between the time that the IDL originally read its contents and the time
569         that the transaction commits, then the transaction aborts and
570         Transaction.commit() returns Transaction.AGAIN_WAIT or
571         Transaction.AGAIN_NOW (depending on whether the database change has
572         already been received).
573
574         The intention is that, to ensure that no transaction commits based on
575         dirty reads, an application should call Row.verify() on each data item
576         read as part of a read-modify-write operation.
577
578         In some cases Row.verify() reduces to a no-op, because the current
579         value of the column is already known:
580
581           - If this row is a row created by the current transaction (returned
582             by Transaction.insert()).
583
584           - If the column has already been modified within the current
585             transaction.
586
587         Because of the latter property, always call Row.verify() *before*
588         modifying the column, for a given read-modify-write.
589
590         A transaction must be in progress."""
591         assert self._idl.txn
592         assert self._changes is not None
593         if not self._data or column_name in self._changes:
594             return
595
596         self._prereqs[column_name] = None
597
598     def delete(self):
599         """Deletes this row from its table.
600
601         A transaction must be in progress."""
602         assert self._idl.txn
603         assert self._changes is not None
604         if self._data is None:
605             del self._idl.txn._txn_rows[self.uuid]
606         self.__dict__["_changes"] = None
607         del self._table.rows[self.uuid]
608
609
610 def _uuid_name_from_uuid(uuid):
611     return "row%s" % str(uuid).replace("-", "_")
612
613
614 def _where_uuid_equals(uuid):
615     return [["_uuid", "==", ["uuid", str(uuid)]]]
616
617
618 class _InsertedRow(object):
619     def __init__(self, op_index):
620         self.op_index = op_index
621         self.real = None
622
623
624 class Transaction(object):
625     # Status values that Transaction.commit() can return.
626     UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
627     UNCHANGED = "unchanged"      # Transaction didn't include any changes.
628     INCOMPLETE = "incomplete"    # Commit in progress, please wait.
629     ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
630     SUCCESS = "success"          # Commit successful.
631     AGAIN_WAIT = "wait then try again"
632                                  # Commit failed because a "verify" operation
633                                  # reported an inconsistency, due to a network
634                                  # problem, or other transient failure.  Wait
635                                  # for a change, then try again.
636     AGAIN_NOW = "try again now"  # Same as AGAIN_WAIT but try again right away.
637     NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
638     ERROR = "error"              # Commit failed due to a hard error.
639
640     @staticmethod
641     def status_to_string(status):
642         """Converts one of the status values that Transaction.commit() can
643         return into a human-readable string.
644
645         (The status values are in fact such strings already, so
646         there's nothing to do.)"""
647         return status
648
649     def __init__(self, idl):
650         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
651         A given Idl may only have a single active transaction at a time.
652
653         A Transaction may modify the contents of a database by assigning new
654         values to columns (attributes of Row), deleting rows (with
655         Row.delete()), or inserting rows (with Transaction.insert()).  It may
656         also check that columns in the database have not changed with
657         Row.verify().
658
659         When a transaction is complete (which must be before the next call to
660         Idl.run()), call Transaction.commit() or Transaction.abort()."""
661         assert idl.txn is None
662
663         idl.txn = self
664         self._request_id = None
665         self.idl = idl
666         self.dry_run = False
667         self._txn_rows = {}
668         self._status = Transaction.UNCOMMITTED
669         self._error = None
670         self._comments = []
671         self._commit_seqno = self.idl.change_seqno
672
673         self._inc_table = None
674         self._inc_column = None
675         self._inc_where = None
676
677         self._inserted_rows = {}  # Map from UUID to _InsertedRow
678
679     def add_comment(self, comment):
680         """Appens 'comment' to the comments that will be passed to the OVSDB
681         server when this transaction is committed.  (The comment will be
682         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
683         relatively human-readable form.)"""
684         self._comments.append(comment)
685
686     def increment(self, table, column, where):
687         assert not self._inc_table
688         self._inc_table = table
689         self._inc_column = column
690         self._inc_where = where
691
692     def wait(self, poller):
693         if self._status not in (Transaction.UNCOMMITTED,
694                                 Transaction.INCOMPLETE):
695             poller.immediate_wake()
696
697     def _substitute_uuids(self, json):
698         if type(json) in (list, tuple):
699             if (len(json) == 2
700                 and json[0] == 'uuid'
701                 and ovs.ovsuuid.is_valid_string(json[1])):
702                 uuid = ovs.ovsuuid.from_string(json[1])
703                 row = self._txn_rows.get(uuid, None)
704                 if row and row._data is None:
705                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
706         return json
707
708     def __disassemble(self):
709         self.idl.txn = None
710
711         for row in self._txn_rows.itervalues():
712             if row._changes is None:
713                 row._table.rows[row.uuid] = row
714             elif row._data is None:
715                 del row._table.rows[row.uuid]
716             row.__dict__["_changes"] = {}
717             row.__dict__["_prereqs"] = {}
718         self._txn_rows = {}
719
720     def commit(self):
721         """Attempts to commit this transaction and returns the status of the
722         commit operation, one of the constants declared as class attributes.
723         If the return value is Transaction.INCOMPLETE, then the transaction is
724         not yet complete and the caller should try calling again later, after
725         calling Idl.run() to run the Idl.
726
727         Committing a transaction rolls back all of the changes that it made to
728         the Idl's copy of the database.  If the transaction commits
729         successfully, then the database server will send an update and, thus,
730         the Idl will be updated with the committed changes."""
731         # The status can only change if we're the active transaction.
732         # (Otherwise, our status will change only in Idl.run().)
733         if self != self.idl.txn:
734             return self._status
735
736         # If we need a lock but don't have it, give up quickly.
737         if self.idl.lock_name and not self.idl.has_lock():
738             self._status = Transaction.NOT_LOCKED
739             self.__disassemble()
740             return self._status
741
742         operations = [self.idl._db.name]
743
744         # Assert that we have the required lock (avoiding a race).
745         if self.idl.lock_name:
746             operations.append({"op": "assert",
747                                "lock": self.idl.lock_name})
748
749         # Add prerequisites and declarations of new rows.
750         for row in self._txn_rows.itervalues():
751             if row._prereqs:
752                 rows = {}
753                 columns = []
754                 for column_name in row._prereqs:
755                     columns.append(column_name)
756                     rows[column_name] = row._data[column_name].to_json()
757                 operations.append({"op": "wait",
758                                    "table": row._table.name,
759                                    "timeout": 0,
760                                    "where": _where_uuid_equals(row.uuid),
761                                    "until": "==",
762                                    "columns": columns,
763                                    "rows": [rows]})
764
765         # Add updates.
766         any_updates = False
767         for row in self._txn_rows.itervalues():
768             if row._changes is None:
769                 if row._table.is_root:
770                     operations.append({"op": "delete",
771                                        "table": row._table.name,
772                                        "where": _where_uuid_equals(row.uuid)})
773                     any_updates = True
774                 else:
775                     # Let ovsdb-server decide whether to really delete it.
776                     pass
777             elif row._changes:
778                 op = {"table": row._table.name}
779                 if row._data is None:
780                     op["op"] = "insert"
781                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
782                     any_updates = True
783
784                     op_index = len(operations) - 1
785                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
786                 else:
787                     op["op"] = "update"
788                     op["where"] = _where_uuid_equals(row.uuid)
789
790                 row_json = {}
791                 op["row"] = row_json
792
793                 for column_name, datum in row._changes.iteritems():
794                     if row._data is not None or not datum.is_default():
795                         row_json[column_name] = (
796                                 self._substitute_uuids(datum.to_json()))
797
798                         # If anything really changed, consider it an update.
799                         # We can't suppress not-really-changed values earlier
800                         # or transactions would become nonatomic (see the big
801                         # comment inside Transaction._write()).
802                         if (not any_updates and row._data is not None and
803                             row._data[column_name] != datum):
804                             any_updates = True
805
806                 if row._data is None or row_json:
807                     operations.append(op)
808
809         # Add increment.
810         if self._inc_table and any_updates:
811             self._inc_index = len(operations) - 1
812
813             operations.append({"op": "mutate",
814                                "table": self._inc_table,
815                                "where": self._substitute_uuids(
816                                    self._inc_where),
817                                "mutations": [[self._inc_column, "+=", 1]]})
818             operations.append({"op": "select",
819                                "table": self._inc_table,
820                                "where": self._substitute_uuids(
821                                    self._inc_where),
822                                "columns": [self._inc_column]})
823
824         # Add comment.
825         if self._comments:
826             operations.append({"op": "comment",
827                                "comment": "\n".join(self._comments)})
828
829         # Dry run?
830         if self.dry_run:
831             operations.append({"op": "abort"})
832
833         if not any_updates:
834             self._status = Transaction.UNCHANGED
835         else:
836             msg = ovs.jsonrpc.Message.create_request("transact", operations)
837             self._request_id = msg.id
838             if not self.idl._session.send(msg):
839                 self.idl._outstanding_txns[self._request_id] = self
840                 self._status = Transaction.INCOMPLETE
841             else:
842                 self._status = Transaction.AGAIN_WAIT
843
844         self.__disassemble()
845         return self._status
846
847     def commit_block(self):
848         while True:
849             status = self.commit()
850             if status != Transaction.INCOMPLETE:
851                 return status
852
853             self.idl.run()
854
855             poller = ovs.poller.Poller()
856             self.idl.wait(poller)
857             self.wait(poller)
858             poller.block()
859
860     def get_increment_new_value(self):
861         assert self._status == Transaction.SUCCESS
862         return self._inc_new_value
863
864     def abort(self):
865         """Aborts this transaction.  If Transaction.commit() has already been
866         called then the transaction might get committed anyhow."""
867         self.__disassemble()
868         if self._status in (Transaction.UNCOMMITTED,
869                             Transaction.INCOMPLETE):
870             self._status = Transaction.ABORTED
871
872     def get_error(self):
873         """Returns a string representing this transaction's current status,
874         suitable for use in log messages."""
875         if self._status != Transaction.ERROR:
876             return Transaction.status_to_string(self._status)
877         elif self._error:
878             return self._error
879         else:
880             return "no error details available"
881
882     def __set_error_json(self, json):
883         if self._error is None:
884             self._error = ovs.json.to_string(json)
885
886     def get_insert_uuid(self, uuid):
887         """Finds and returns the permanent UUID that the database assigned to a
888         newly inserted row, given the UUID that Transaction.insert() assigned
889         locally to that row.
890
891         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
892         or if it was assigned by that function and then deleted by Row.delete()
893         within the same transaction.  (Rows that are inserted and then deleted
894         within a single transaction are never sent to the database server, so
895         it never assigns them a permanent UUID.)
896
897         This transaction must have completed successfully."""
898         assert self._status in (Transaction.SUCCESS,
899                                 Transaction.UNCHANGED)
900         inserted_row = self._inserted_rows.get(uuid)
901         if inserted_row:
902             return inserted_row.real
903         return None
904
905     def _write(self, row, column, datum):
906         assert row._changes is not None
907
908         txn = row._idl.txn
909
910         # If this is a write-only column and the datum being written is the
911         # same as the one already there, just skip the update entirely.  This
912         # is worth optimizing because we have a lot of columns that get
913         # periodically refreshed into the database but don't actually change
914         # that often.
915         #
916         # We don't do this for read/write columns because that would break
917         # atomicity of transactions--some other client might have written a
918         # different value in that column since we read it.  (But if a whole
919         # transaction only does writes of existing values, without making any
920         # real changes, we will drop the whole transaction later in
921         # ovsdb_idl_txn_commit().)
922         if not column.alert and row._data.get(column.name) == datum:
923             new_value = row._changes.get(column.name)
924             if new_value is None or new_value == datum:
925                 return
926
927         txn._txn_rows[row.uuid] = row
928         row._changes[column.name] = datum.copy()
929
930     def insert(self, table, new_uuid=None):
931         """Inserts and returns a new row in 'table', which must be one of the
932         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
933
934         The new row is assigned a provisional UUID.  If 'uuid' is None then one
935         is randomly generated; otherwise 'uuid' should specify a randomly
936         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
937         different UUID when 'txn' is committed, but the IDL will replace any
938         uses of the provisional UUID in the data to be to be committed by the
939         UUID assigned by ovsdb-server."""
940         assert self._status == Transaction.UNCOMMITTED
941         if new_uuid is None:
942             new_uuid = uuid.uuid4()
943         row = Row(self.idl, table, new_uuid, None)
944         table.rows[row.uuid] = row
945         self._txn_rows[row.uuid] = row
946         return row
947
948     def _process_reply(self, msg):
949         if msg.type == ovs.jsonrpc.Message.T_ERROR:
950             self._status = Transaction.ERROR
951         elif type(msg.result) not in (list, tuple):
952             # XXX rate-limit
953             vlog.warn('reply to "transact" is not JSON array')
954         else:
955             hard_errors = False
956             soft_errors = False
957             lock_errors = False
958
959             ops = msg.result
960             for op in ops:
961                 if op is None:
962                     # This isn't an error in itself but indicates that some
963                     # prior operation failed, so make sure that we know about
964                     # it.
965                     soft_errors = True
966                 elif type(op) == dict:
967                     error = op.get("error")
968                     if error is not None:
969                         if error == "timed out":
970                             soft_errors = True
971                         elif error == "not owner":
972                             lock_errors = True
973                         elif error == "aborted":
974                             pass
975                         else:
976                             hard_errors = True
977                             self.__set_error_json(op)
978                 else:
979                     hard_errors = True
980                     self.__set_error_json(op)
981                     # XXX rate-limit
982                     vlog.warn("operation reply is not JSON null or object")
983
984             if not soft_errors and not hard_errors and not lock_errors:
985                 if self._inc_table and not self.__process_inc_reply(ops):
986                     hard_errors = True
987
988                 for insert in self._inserted_rows.itervalues():
989                     if not self.__process_insert_reply(insert, ops):
990                         hard_errors = True
991
992             if hard_errors:
993                 self._status = Transaction.ERROR
994             elif lock_errors:
995                 self._status = Transaction.NOT_LOCKED
996             elif soft_errors:
997                 if self._commit_seqno == self.idl.change_seqno:
998                     self._status = Transaction.AGAIN_WAIT
999                 else:
1000                     self._status = Transaction.AGAIN_NOW
1001             else:
1002                 self._status = Transaction.SUCCESS
1003
1004     @staticmethod
1005     def __check_json_type(json, types, name):
1006         if not json:
1007             # XXX rate-limit
1008             vlog.warn("%s is missing" % name)
1009             return False
1010         elif type(json) not in types:
1011             # XXX rate-limit
1012             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1013             return False
1014         else:
1015             return True
1016
1017     def __process_inc_reply(self, ops):
1018         if self._inc_index + 2 > len(ops):
1019             # XXX rate-limit
1020             vlog.warn("reply does not contain enough operations for "
1021                       "increment (has %d, needs %d)" %
1022                       (len(ops), self._inc_index + 2))
1023
1024         # We know that this is a JSON object because the loop in
1025         # __process_reply() already checked.
1026         mutate = ops[self._inc_index]
1027         count = mutate.get("count")
1028         if not Transaction.__check_json_type(count, (int, long),
1029                                              '"mutate" reply "count"'):
1030             return False
1031         if count != 1:
1032             # XXX rate-limit
1033             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1034             return False
1035
1036         select = ops[self._inc_index + 1]
1037         rows = select.get("rows")
1038         if not Transaction.__check_json_type(rows, (list, tuple),
1039                                              '"select" reply "rows"'):
1040             return False
1041         if len(rows) != 1:
1042             # XXX rate-limit
1043             vlog.warn('"select" reply "rows" has %d elements '
1044                       'instead of 1' % len(rows))
1045             return False
1046         row = rows[0]
1047         if not Transaction.__check_json_type(row, (dict,),
1048                                              '"select" reply row'):
1049             return False
1050         column = row.get(self._inc_column)
1051         if not Transaction.__check_json_type(column, (int, long),
1052                                              '"select" reply inc column'):
1053             return False
1054         self._inc_new_value = column
1055         return True
1056
1057     def __process_insert_reply(self, insert, ops):
1058         if insert.op_index >= len(ops):
1059             # XXX rate-limit
1060             vlog.warn("reply does not contain enough operations "
1061                       "for insert (has %d, needs %d)"
1062                       % (len(ops), insert.op_index))
1063             return False
1064
1065         # We know that this is a JSON object because the loop in
1066         # __process_reply() already checked.
1067         reply = ops[insert.op_index]
1068         json_uuid = reply.get("uuid")
1069         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1070                                              '"insert" reply "uuid"'):
1071             return False
1072
1073         try:
1074             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1075         except error.Error:
1076             # XXX rate-limit
1077             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1078             return False
1079
1080         insert.real = uuid_
1081         return True
1082
1083
1084 class SchemaHelper(object):
1085     """IDL Schema helper.
1086
1087     This class encapsulates the logic required to generate schemas suitable
1088     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1089     they are interested in using register_columns().  When finished, the
1090     get_idl_schema() function may be called.
1091
1092     The location on disk of the schema used may be found in the
1093     'schema_location' variable."""
1094
1095     def __init__(self, location=None):
1096         """Creates a new Schema object."""
1097
1098         if location is None:
1099             location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1100
1101         self.schema_location = location
1102         self._tables = {}
1103         self._all = False
1104
1105     def register_columns(self, table, columns):
1106         """Registers interest in the given 'columns' of 'table'.  Future calls
1107         to get_idl_schema() will include 'table':column for each column in
1108         'columns'. This function automatically avoids adding duplicate entries
1109         to the schema.
1110
1111         'table' must be a string.
1112         'columns' must be a list of strings.
1113         """
1114
1115         assert type(table) is str
1116         assert type(columns) is list
1117
1118         columns = set(columns) | self._tables.get(table, set())
1119         self._tables[table] = columns
1120
1121     def register_all(self):
1122         """Registers interest in every column of every table."""
1123         self._all = True
1124
1125     def get_idl_schema(self):
1126         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1127         object based on columns registered using the register_columns()
1128         function."""
1129
1130         schema = ovs.db.schema.DbSchema.from_json(
1131             ovs.json.from_file(self.schema_location))
1132
1133         if not self._all:
1134             schema_tables = {}
1135             for table, columns in self._tables.iteritems():
1136                 schema_tables[table] = (
1137                     self._keep_table_columns(schema, table, columns))
1138
1139             schema.tables = schema_tables
1140         return schema
1141
1142     def _keep_table_columns(self, schema, table_name, columns):
1143         assert table_name in schema.tables
1144         table = schema.tables[table_name]
1145
1146         new_columns = {}
1147         for column_name in columns:
1148             assert type(column_name) is str
1149             assert column_name in table.columns
1150
1151             new_columns[column_name] = table.columns[column_name]
1152
1153         table.columns = new_columns
1154         return table