Rearrange structures to better fit valgrind's memory leak heuristics.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "bitmap.h"
24 #include "column.h"
25 #include "dynamic-string.h"
26 #include "json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
30 #include "ovsdb.h"
31 #include "reconnect.h"
32 #include "row.h"
33 #include "server.h"
34 #include "stream.h"
35 #include "table.h"
36 #include "timeval.h"
37 #include "transaction.h"
38 #include "trigger.h"
39 #include "vlog.h"
40
41 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
42
43 struct ovsdb_jsonrpc_remote;
44 struct ovsdb_jsonrpc_session;
45
46 /* Message rate-limiting. */
47 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
48
49 /* Sessions. */
50 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
51     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
52 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
53 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
54 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
55 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
56 static void ovsdb_jsonrpc_session_set_all_options(
57     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
58 static bool ovsdb_jsonrpc_session_get_status(
59     const struct ovsdb_jsonrpc_remote *,
60     struct ovsdb_jsonrpc_remote_status *);
61 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
62 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
63
64 /* Triggers. */
65 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
66                                          struct json *id, struct json *params);
67 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
68     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
69 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
70 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
71 static void ovsdb_jsonrpc_trigger_complete_done(
72     struct ovsdb_jsonrpc_session *);
73
74 /* Monitors. */
75 static struct json *ovsdb_jsonrpc_monitor_create(
76     struct ovsdb_jsonrpc_session *, struct json *params);
77 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
78     struct ovsdb_jsonrpc_session *,
79     struct json_array *params,
80     const struct json *request_id);
81 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
82 \f
83 /* JSON-RPC database server. */
84
85 struct ovsdb_jsonrpc_server {
86     struct ovsdb_server up;
87     unsigned int n_sessions, max_sessions;
88     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
89 };
90
91 /* A configured remote.  This is either a passive stream listener plus a list
92  * of the currently connected sessions, or a list of exactly one active
93  * session. */
94 struct ovsdb_jsonrpc_remote {
95     struct ovsdb_jsonrpc_server *server;
96     struct pstream *listener;   /* Listener, if passive. */
97     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
98 };
99
100 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
101     struct ovsdb_jsonrpc_server *, const char *name,
102     const struct ovsdb_jsonrpc_options *options
103 );
104 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
105
106 struct ovsdb_jsonrpc_server *
107 ovsdb_jsonrpc_server_create(struct ovsdb *db)
108 {
109     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
110     ovsdb_server_init(&server->up, db);
111     server->max_sessions = 64;
112     shash_init(&server->remotes);
113     return server;
114 }
115
116 void
117 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
118 {
119     struct shash_node *node, *next;
120
121     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
122         ovsdb_jsonrpc_server_del_remote(node);
123     }
124     shash_destroy(&svr->remotes);
125     ovsdb_server_destroy(&svr->up);
126     free(svr);
127 }
128
129 struct ovsdb_jsonrpc_options *
130 ovsdb_jsonrpc_default_options(void)
131 {
132     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
133     options->probe_interval = RECONNECT_DEFAULT_PROBE_INTERVAL;
134     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
135     return options;
136 }
137
138 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
139  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
140  *
141  * A remote is an active or passive stream connection method, e.g. "pssl:" or
142  * "tcp:1.2.3.4". */
143 void
144 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
145                                  const struct shash *new_remotes)
146 {
147     struct shash_node *node, *next;
148
149     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
150         if (!shash_find(new_remotes, node->name)) {
151             VLOG_INFO("%s: remote deconfigured", node->name);
152             ovsdb_jsonrpc_server_del_remote(node);
153         }
154     }
155     SHASH_FOR_EACH (node, new_remotes) {
156         const struct ovsdb_jsonrpc_options *options = node->data;
157         struct ovsdb_jsonrpc_remote *remote;
158
159         remote = shash_find_data(&svr->remotes, node->name);
160         if (!remote) {
161             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
162             if (!remote) {
163                 continue;
164             }
165         }
166
167         ovsdb_jsonrpc_session_set_all_options(remote, options);
168     }
169 }
170
171 static struct ovsdb_jsonrpc_remote *
172 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
173                                 const char *name,
174                                 const struct ovsdb_jsonrpc_options *options)
175 {
176     struct ovsdb_jsonrpc_remote *remote;
177     struct pstream *listener;
178     int error;
179
180     error = jsonrpc_pstream_open(name, &listener, options->dscp);
181     if (error && error != EAFNOSUPPORT) {
182         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
183         return NULL;
184     }
185
186     remote = xmalloc(sizeof *remote);
187     remote->server = svr;
188     remote->listener = listener;
189     list_init(&remote->sessions);
190     shash_add(&svr->remotes, name, remote);
191
192     if (!listener) {
193         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
194     }
195     return remote;
196 }
197
198 static void
199 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
200 {
201     struct ovsdb_jsonrpc_remote *remote = node->data;
202
203     ovsdb_jsonrpc_session_close_all(remote);
204     pstream_close(remote->listener);
205     shash_delete(&remote->server->remotes, node);
206     free(remote);
207 }
208
209 /* Stores status information for the remote named 'target', which should have
210  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
211  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
212  * a remote named 'target' or if that remote is an inbound remote that has no
213  * active connections) returns false.  On failure, 'status' will be zeroed.
214  */
215 bool
216 ovsdb_jsonrpc_server_get_remote_status(
217     const struct ovsdb_jsonrpc_server *svr, const char *target,
218     struct ovsdb_jsonrpc_remote_status *status)
219 {
220     const struct ovsdb_jsonrpc_remote *remote;
221
222     memset(status, 0, sizeof *status);
223
224     remote = shash_find_data(&svr->remotes, target);
225     return remote && ovsdb_jsonrpc_session_get_status(remote, status);
226 }
227
228 void
229 ovsdb_jsonrpc_server_free_remote_status(
230     struct ovsdb_jsonrpc_remote_status *status)
231 {
232     free(status->locks_held);
233     free(status->locks_waiting);
234     free(status->locks_lost);
235 }
236
237 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
238  * reconnect. */
239 void
240 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
241 {
242     struct shash_node *node;
243
244     SHASH_FOR_EACH (node, &svr->remotes) {
245         struct ovsdb_jsonrpc_remote *remote = node->data;
246
247         ovsdb_jsonrpc_session_reconnect_all(remote);
248     }
249 }
250
251 void
252 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
253 {
254     struct shash_node *node;
255
256     SHASH_FOR_EACH (node, &svr->remotes) {
257         struct ovsdb_jsonrpc_remote *remote = node->data;
258
259         if (remote->listener && svr->n_sessions < svr->max_sessions) {
260             struct stream *stream;
261             int error;
262
263             error = pstream_accept(remote->listener, &stream);
264             if (!error) {
265                 struct jsonrpc_session *js;
266                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
267                 ovsdb_jsonrpc_session_create(remote, js);
268             } else if (error != EAGAIN) {
269                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
270                              pstream_get_name(remote->listener),
271                              strerror(error));
272             }
273         }
274
275         ovsdb_jsonrpc_session_run_all(remote);
276     }
277 }
278
279 void
280 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
281 {
282     struct shash_node *node;
283
284     SHASH_FOR_EACH (node, &svr->remotes) {
285         struct ovsdb_jsonrpc_remote *remote = node->data;
286
287         if (remote->listener && svr->n_sessions < svr->max_sessions) {
288             pstream_wait(remote->listener);
289         }
290
291         ovsdb_jsonrpc_session_wait_all(remote);
292     }
293 }
294 \f
295 /* JSON-RPC database server session. */
296
297 struct ovsdb_jsonrpc_session {
298     struct list node;           /* Element in remote's sessions list. */
299     struct ovsdb_session up;
300     struct ovsdb_jsonrpc_remote *remote;
301
302     /* Triggers. */
303     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
304
305     /* Monitors. */
306     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
307
308     /* Network connectivity. */
309     struct jsonrpc_session *js;  /* JSON-RPC session. */
310     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
311 };
312
313 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
314 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
315 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
316 static void ovsdb_jsonrpc_session_set_options(
317     struct ovsdb_jsonrpc_session *, const struct ovsdb_jsonrpc_options *);
318 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
319                                              struct jsonrpc_msg *);
320 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
321                                              struct jsonrpc_msg *);
322
323 static struct ovsdb_jsonrpc_session *
324 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
325                              struct jsonrpc_session *js)
326 {
327     struct ovsdb_jsonrpc_session *s;
328
329     s = xzalloc(sizeof *s);
330     ovsdb_session_init(&s->up, remote->server->up.db);
331     s->remote = remote;
332     list_push_back(&remote->sessions, &s->node);
333     hmap_init(&s->triggers);
334     hmap_init(&s->monitors);
335     s->js = js;
336     s->js_seqno = jsonrpc_session_get_seqno(js);
337
338     remote->server->n_sessions++;
339
340     return s;
341 }
342
343 static void
344 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
345 {
346     ovsdb_jsonrpc_monitor_remove_all(s);
347     ovsdb_jsonrpc_session_unlock_all(s);
348     jsonrpc_session_close(s->js);
349     list_remove(&s->node);
350     ovsdb_session_destroy(&s->up);
351     s->remote->server->n_sessions--;
352     ovsdb_session_destroy(&s->up);
353     free(s);
354 }
355
356 static int
357 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
358 {
359     jsonrpc_session_run(s->js);
360     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
361         s->js_seqno = jsonrpc_session_get_seqno(s->js);
362         ovsdb_jsonrpc_trigger_complete_all(s);
363         ovsdb_jsonrpc_monitor_remove_all(s);
364         ovsdb_jsonrpc_session_unlock_all(s);
365     }
366
367     ovsdb_jsonrpc_trigger_complete_done(s);
368
369     if (!jsonrpc_session_get_backlog(s->js)) {
370         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
371         if (msg) {
372             if (msg->type == JSONRPC_REQUEST) {
373                 ovsdb_jsonrpc_session_got_request(s, msg);
374             } else if (msg->type == JSONRPC_NOTIFY) {
375                 ovsdb_jsonrpc_session_got_notify(s, msg);
376             } else {
377                 VLOG_WARN("%s: received unexpected %s message",
378                           jsonrpc_session_get_name(s->js),
379                           jsonrpc_msg_type_to_string(msg->type));
380                 jsonrpc_session_force_reconnect(s->js);
381                 jsonrpc_msg_destroy(msg);
382             }
383         }
384     }
385     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
386 }
387
388 static void
389 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
390                                   const struct ovsdb_jsonrpc_options *options)
391 {
392     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
393     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
394     jsonrpc_session_set_dscp(session->js, options->dscp);
395 }
396
397 static void
398 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
399 {
400     struct ovsdb_jsonrpc_session *s, *next;
401
402     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
403         int error = ovsdb_jsonrpc_session_run(s);
404         if (error) {
405             ovsdb_jsonrpc_session_close(s);
406         }
407     }
408 }
409
410 static void
411 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
412 {
413     jsonrpc_session_wait(s->js);
414     if (!jsonrpc_session_get_backlog(s->js)) {
415         jsonrpc_session_recv_wait(s->js);
416     }
417 }
418
419 static void
420 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
421 {
422     struct ovsdb_jsonrpc_session *s;
423
424     LIST_FOR_EACH (s, node, &remote->sessions) {
425         ovsdb_jsonrpc_session_wait(s);
426     }
427 }
428
429 static void
430 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
431 {
432     struct ovsdb_jsonrpc_session *s, *next;
433
434     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
435         ovsdb_jsonrpc_session_close(s);
436     }
437 }
438
439 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
440  * reconnect. */
441 static void
442 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
443 {
444     struct ovsdb_jsonrpc_session *s, *next;
445
446     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
447         jsonrpc_session_force_reconnect(s->js);
448         if (!jsonrpc_session_is_alive(s->js)) {
449             ovsdb_jsonrpc_session_close(s);
450         }
451     }
452 }
453
454 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
455  * 'options'. */
456 static void
457 ovsdb_jsonrpc_session_set_all_options(
458     struct ovsdb_jsonrpc_remote *remote,
459     const struct ovsdb_jsonrpc_options *options)
460 {
461     struct ovsdb_jsonrpc_session *s;
462
463     LIST_FOR_EACH (s, node, &remote->sessions) {
464         ovsdb_jsonrpc_session_set_options(s, options);
465     }
466 }
467
468 static bool
469 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
470                                  struct ovsdb_jsonrpc_remote_status *status)
471 {
472     const struct ovsdb_jsonrpc_session *s;
473     const struct jsonrpc_session *js;
474     struct ovsdb_lock_waiter *waiter;
475     struct reconnect_stats rstats;
476     struct ds locks_held, locks_waiting, locks_lost;
477
478     if (list_is_empty(&remote->sessions)) {
479         return false;
480     }
481     s = CONTAINER_OF(remote->sessions.next, struct ovsdb_jsonrpc_session, node);
482     js = s->js;
483
484     status->is_connected = jsonrpc_session_is_connected(js);
485     status->last_error = jsonrpc_session_get_status(js);
486
487     jsonrpc_session_get_reconnect_stats(js, &rstats);
488     status->state = rstats.state;
489     status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
490         ? UINT_MAX : rstats.msec_since_connect / 1000;
491     status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
492         ? UINT_MAX : rstats.msec_since_disconnect / 1000;
493
494     ds_init(&locks_held);
495     ds_init(&locks_waiting);
496     ds_init(&locks_lost);
497     HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
498         struct ds *string;
499
500         string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
501                   : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
502                   : &locks_lost);
503         if (string->length) {
504             ds_put_char(string, ' ');
505         }
506         ds_put_cstr(string, waiter->lock_name);
507     }
508     status->locks_held = ds_steal_cstr(&locks_held);
509     status->locks_waiting = ds_steal_cstr(&locks_waiting);
510     status->locks_lost = ds_steal_cstr(&locks_lost);
511
512     status->n_connections = list_size(&remote->sessions);
513
514     return true;
515 }
516
517 static const char *
518 get_db_name(const struct ovsdb_jsonrpc_session *s)
519 {
520     return s->remote->server->up.db->schema->name;
521 }
522
523 static struct jsonrpc_msg *
524 ovsdb_jsonrpc_check_db_name(const struct ovsdb_jsonrpc_session *s,
525                             const struct jsonrpc_msg *request)
526 {
527     struct json_array *params;
528     const char *want_db_name;
529     const char *have_db_name;
530     struct ovsdb_error *error;
531     struct jsonrpc_msg *reply;
532
533     params = json_array(request->params);
534     if (!params->n || params->elems[0]->type != JSON_STRING) {
535         error = ovsdb_syntax_error(
536             request->params, NULL,
537             "%s request params must begin with <db-name>", request->method);
538         goto error;
539     }
540
541     want_db_name = params->elems[0]->u.string;
542     have_db_name = get_db_name(s);
543     if (strcmp(want_db_name, have_db_name)) {
544         error = ovsdb_syntax_error(
545             request->params, "unknown database",
546             "%s request specifies unknown database %s",
547             request->method, want_db_name);
548         goto error;
549     }
550
551     return NULL;
552
553 error:
554     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
555     ovsdb_error_destroy(error);
556     return reply;
557 }
558
559 static struct ovsdb_error *
560 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
561                                       const char **lock_namep)
562 {
563     const struct json_array *params;
564
565     params = json_array(request->params);
566     if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
567         !ovsdb_parser_is_id(json_string(params->elems[0]))) {
568         *lock_namep = NULL;
569         return ovsdb_syntax_error(request->params, NULL,
570                                   "%s request params must be <id>",
571                                   request->method);
572     }
573
574     *lock_namep = json_string(params->elems[0]);
575     return NULL;
576 }
577
578 static void
579 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
580                              const char *lock_name,
581                              const char *method)
582 {
583     struct ovsdb_jsonrpc_session *s;
584     struct json *params;
585
586     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
587     params = json_array_create_1(json_string_create(lock_name));
588     jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params));
589 }
590
591 static struct jsonrpc_msg *
592 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
593                            struct jsonrpc_msg *request,
594                            enum ovsdb_lock_mode mode)
595 {
596     struct ovsdb_lock_waiter *waiter;
597     struct jsonrpc_msg *reply;
598     struct ovsdb_error *error;
599     struct ovsdb_session *victim;
600     const char *lock_name;
601     struct json *result;
602
603     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
604     if (error) {
605         goto error;
606     }
607
608     /* Report error if this session has issued a "lock" or "steal" without a
609      * matching "unlock" for this lock. */
610     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
611     if (waiter) {
612         error = ovsdb_syntax_error(
613             request->params, NULL,
614             "must issue \"unlock\" before new \"%s\"", request->method);
615         goto error;
616     }
617
618     /* Get the lock, add us as a waiter. */
619     waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
620                                &victim);
621     if (victim) {
622         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
623     }
624
625     result = json_object_create();
626     json_object_put(result, "locked",
627                     json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
628
629     return jsonrpc_create_reply(result, request->id);
630
631 error:
632     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
633     ovsdb_error_destroy(error);
634     return reply;
635 }
636
637 static void
638 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
639 {
640     struct ovsdb_lock_waiter *waiter, *next;
641
642     HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
643         ovsdb_jsonrpc_session_unlock__(waiter);
644     }
645 }
646
647 static void
648 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
649 {
650     struct ovsdb_lock *lock = waiter->lock;
651
652     if (lock) {
653         struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
654         if (new_owner) {
655             ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
656         } else {
657             /* ovsdb_server_lock() might have freed 'lock'. */
658         }
659     }
660
661     ovsdb_lock_waiter_destroy(waiter);
662 }
663
664 static struct jsonrpc_msg *
665 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
666                              struct jsonrpc_msg *request)
667 {
668     struct ovsdb_lock_waiter *waiter;
669     struct jsonrpc_msg *reply;
670     struct ovsdb_error *error;
671     const char *lock_name;
672
673     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
674     if (error) {
675         goto error;
676     }
677
678     /* Report error if this session has not issued a "lock" or "steal" for this
679      * lock. */
680     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
681     if (!waiter) {
682         error = ovsdb_syntax_error(
683             request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
684         goto error;
685     }
686
687     ovsdb_jsonrpc_session_unlock__(waiter);
688
689     return jsonrpc_create_reply(json_object_create(), request->id);
690
691 error:
692     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
693     ovsdb_error_destroy(error);
694     return reply;
695 }
696
697 static struct jsonrpc_msg *
698 execute_transaction(struct ovsdb_jsonrpc_session *s,
699                     struct jsonrpc_msg *request)
700 {
701     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
702     request->id = NULL;
703     request->params = NULL;
704     jsonrpc_msg_destroy(request);
705     return NULL;
706 }
707
708 static void
709 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
710                                   struct jsonrpc_msg *request)
711 {
712     struct jsonrpc_msg *reply;
713
714     if (!strcmp(request->method, "transact")) {
715         reply = ovsdb_jsonrpc_check_db_name(s, request);
716         if (!reply) {
717             reply = execute_transaction(s, request);
718         }
719     } else if (!strcmp(request->method, "monitor")) {
720         reply = ovsdb_jsonrpc_check_db_name(s, request);
721         if (!reply) {
722             reply = jsonrpc_create_reply(
723                 ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
724         }
725     } else if (!strcmp(request->method, "monitor_cancel")) {
726         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
727                                              request->id);
728     } else if (!strcmp(request->method, "get_schema")) {
729         reply = ovsdb_jsonrpc_check_db_name(s, request);
730         if (!reply) {
731             reply = jsonrpc_create_reply(
732                 ovsdb_schema_to_json(s->remote->server->up.db->schema),
733                 request->id);
734         }
735     } else if (!strcmp(request->method, "list_dbs")) {
736         reply = jsonrpc_create_reply(
737             json_array_create_1(json_string_create(get_db_name(s))),
738             request->id);
739     } else if (!strcmp(request->method, "lock")) {
740         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
741     } else if (!strcmp(request->method, "steal")) {
742         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
743     } else if (!strcmp(request->method, "unlock")) {
744         reply = ovsdb_jsonrpc_session_unlock(s, request);
745     } else if (!strcmp(request->method, "echo")) {
746         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
747     } else {
748         reply = jsonrpc_create_error(json_string_create("unknown method"),
749                                      request->id);
750     }
751
752     if (reply) {
753         jsonrpc_msg_destroy(request);
754         jsonrpc_session_send(s->js, reply);
755     }
756 }
757
758 static void
759 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
760 {
761     if (json_array(request->params)->n == 1) {
762         struct ovsdb_jsonrpc_trigger *t;
763         struct json *id;
764
765         id = request->params->u.array.elems[0];
766         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
767         if (t) {
768             ovsdb_jsonrpc_trigger_complete(t);
769         }
770     }
771 }
772
773 static void
774 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
775                                  struct jsonrpc_msg *request)
776 {
777     if (!strcmp(request->method, "cancel")) {
778         execute_cancel(s, request);
779     }
780     jsonrpc_msg_destroy(request);
781 }
782 \f
783 /* JSON-RPC database server triggers.
784  *
785  * (Every transaction is treated as a trigger even if it doesn't actually have
786  * any "wait" operations.) */
787
788 struct ovsdb_jsonrpc_trigger {
789     struct ovsdb_trigger trigger;
790     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
791     struct json *id;
792 };
793
794 static void
795 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
796                              struct json *id, struct json *params)
797 {
798     struct ovsdb_jsonrpc_trigger *t;
799     size_t hash;
800
801     /* Check for duplicate ID. */
802     hash = json_hash(id, 0);
803     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
804     if (t) {
805         struct jsonrpc_msg *msg;
806
807         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
808                                    id);
809         jsonrpc_session_send(s->js, msg);
810         json_destroy(id);
811         json_destroy(params);
812         return;
813     }
814
815     /* Insert into trigger table. */
816     t = xmalloc(sizeof *t);
817     ovsdb_trigger_init(&s->up, &t->trigger, params, time_msec());
818     t->id = id;
819     hmap_insert(&s->triggers, &t->hmap_node, hash);
820
821     /* Complete early if possible. */
822     if (ovsdb_trigger_is_complete(&t->trigger)) {
823         ovsdb_jsonrpc_trigger_complete(t);
824     }
825 }
826
827 static struct ovsdb_jsonrpc_trigger *
828 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
829                            const struct json *id, size_t hash)
830 {
831     struct ovsdb_jsonrpc_trigger *t;
832
833     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
834         if (json_equal(t->id, id)) {
835             return t;
836         }
837     }
838
839     return NULL;
840 }
841
842 static void
843 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
844 {
845     struct ovsdb_jsonrpc_session *s;
846
847     s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
848
849     if (jsonrpc_session_is_connected(s->js)) {
850         struct jsonrpc_msg *reply;
851         struct json *result;
852
853         result = ovsdb_trigger_steal_result(&t->trigger);
854         if (result) {
855             reply = jsonrpc_create_reply(result, t->id);
856         } else {
857             reply = jsonrpc_create_error(json_string_create("canceled"),
858                                          t->id);
859         }
860         jsonrpc_session_send(s->js, reply);
861     }
862
863     json_destroy(t->id);
864     ovsdb_trigger_destroy(&t->trigger);
865     hmap_remove(&s->triggers, &t->hmap_node);
866     free(t);
867 }
868
869 static void
870 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
871 {
872     struct ovsdb_jsonrpc_trigger *t, *next;
873     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
874         ovsdb_jsonrpc_trigger_complete(t);
875     }
876 }
877
878 static void
879 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
880 {
881     while (!list_is_empty(&s->up.completions)) {
882         struct ovsdb_jsonrpc_trigger *t
883             = CONTAINER_OF(s->up.completions.next,
884                            struct ovsdb_jsonrpc_trigger, trigger.node);
885         ovsdb_jsonrpc_trigger_complete(t);
886     }
887 }
888 \f
889 /* JSON-RPC database table monitors. */
890
891 enum ovsdb_jsonrpc_monitor_selection {
892     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
893     OJMS_INSERT = 1 << 1,       /* New rows. */
894     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
895     OJMS_MODIFY = 1 << 3        /* Modified rows. */
896 };
897
898 /* A particular column being monitored. */
899 struct ovsdb_jsonrpc_monitor_column {
900     const struct ovsdb_column *column;
901     enum ovsdb_jsonrpc_monitor_selection select;
902 };
903
904 /* A particular table being monitored. */
905 struct ovsdb_jsonrpc_monitor_table {
906     const struct ovsdb_table *table;
907
908     /* This is the union (bitwise-OR) of the 'select' values in all of the
909      * members of 'columns' below. */
910     enum ovsdb_jsonrpc_monitor_selection select;
911
912     /* Columns being monitored. */
913     struct ovsdb_jsonrpc_monitor_column *columns;
914     size_t n_columns;
915 };
916
917 /* A collection of tables being monitored. */
918 struct ovsdb_jsonrpc_monitor {
919     struct ovsdb_replica replica;
920     struct ovsdb_jsonrpc_session *session;
921     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
922
923     struct json *monitor_id;
924     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
925 };
926
927 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
928
929 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
930     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
931 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
932 static struct json *ovsdb_jsonrpc_monitor_get_initial(
933     const struct ovsdb_jsonrpc_monitor *);
934
935 static bool
936 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
937 {
938     const struct json *json;
939
940     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
941     return json ? json_boolean(json) : default_value;
942 }
943
944 struct ovsdb_jsonrpc_monitor *
945 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
946                            const struct json *monitor_id)
947 {
948     struct ovsdb_jsonrpc_monitor *m;
949
950     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
951         if (json_equal(m->monitor_id, monitor_id)) {
952             return m;
953         }
954     }
955
956     return NULL;
957 }
958
959 static void
960 ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
961                                  const struct ovsdb_column *column,
962                                  enum ovsdb_jsonrpc_monitor_selection select,
963                                  size_t *allocated_columns)
964 {
965     struct ovsdb_jsonrpc_monitor_column *c;
966
967     if (mt->n_columns >= *allocated_columns) {
968         mt->columns = x2nrealloc(mt->columns, allocated_columns,
969                                  sizeof *mt->columns);
970     }
971
972     c = &mt->columns[mt->n_columns++];
973     c->column = column;
974     c->select = select;
975 }
976
977 static int
978 compare_ovsdb_jsonrpc_monitor_column(const void *a_, const void *b_)
979 {
980     const struct ovsdb_jsonrpc_monitor_column *a = a_;
981     const struct ovsdb_jsonrpc_monitor_column *b = b_;
982
983     return a->column < b->column ? -1 : a->column > b->column;
984 }
985
986 static struct ovsdb_error * WARN_UNUSED_RESULT
987 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
988                                     const struct json *monitor_request,
989                                     size_t *allocated_columns)
990 {
991     const struct ovsdb_table_schema *ts = mt->table->schema;
992     enum ovsdb_jsonrpc_monitor_selection select;
993     const struct json *columns, *select_json;
994     struct ovsdb_parser parser;
995     struct ovsdb_error *error;
996
997     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
998     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
999     select_json = ovsdb_parser_member(&parser, "select",
1000                                       OP_OBJECT | OP_OPTIONAL);
1001     error = ovsdb_parser_finish(&parser);
1002     if (error) {
1003         return error;
1004     }
1005
1006     if (select_json) {
1007         select = 0;
1008         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1009         if (parse_bool(&parser, "initial", true)) {
1010             select |= OJMS_INITIAL;
1011         }
1012         if (parse_bool(&parser, "insert", true)) {
1013             select |= OJMS_INSERT;
1014         }
1015         if (parse_bool(&parser, "delete", true)) {
1016             select |= OJMS_DELETE;
1017         }
1018         if (parse_bool(&parser, "modify", true)) {
1019             select |= OJMS_MODIFY;
1020         }
1021         error = ovsdb_parser_finish(&parser);
1022         if (error) {
1023             return error;
1024         }
1025     } else {
1026         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1027     }
1028     mt->select |= select;
1029
1030     if (columns) {
1031         size_t i;
1032
1033         if (columns->type != JSON_ARRAY) {
1034             return ovsdb_syntax_error(columns, NULL,
1035                                       "array of column names expected");
1036         }
1037
1038         for (i = 0; i < columns->u.array.n; i++) {
1039             const struct ovsdb_column *column;
1040             const char *s;
1041
1042             if (columns->u.array.elems[i]->type != JSON_STRING) {
1043                 return ovsdb_syntax_error(columns, NULL,
1044                                           "array of column names expected");
1045             }
1046
1047             s = columns->u.array.elems[i]->u.string;
1048             column = shash_find_data(&mt->table->schema->columns, s);
1049             if (!column) {
1050                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1051                                           "column name", s);
1052             }
1053             ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1054                                              allocated_columns);
1055         }
1056     } else {
1057         struct shash_node *node;
1058
1059         SHASH_FOR_EACH (node, &ts->columns) {
1060             const struct ovsdb_column *column = node->data;
1061             if (column->index != OVSDB_COL_UUID) {
1062                 ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1063                                                  allocated_columns);
1064             }
1065         }
1066     }
1067
1068     return NULL;
1069 }
1070
1071 static struct json *
1072 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
1073                              struct json *params)
1074 {
1075     struct ovsdb_jsonrpc_monitor *m = NULL;
1076     struct json *monitor_id, *monitor_requests;
1077     struct ovsdb_error *error = NULL;
1078     struct shash_node *node;
1079     struct json *json;
1080
1081     if (json_array(params)->n != 3) {
1082         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1083         goto error;
1084     }
1085     monitor_id = params->u.array.elems[1];
1086     monitor_requests = params->u.array.elems[2];
1087     if (monitor_requests->type != JSON_OBJECT) {
1088         error = ovsdb_syntax_error(monitor_requests, NULL,
1089                                    "monitor-requests must be object");
1090         goto error;
1091     }
1092
1093     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1094         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1095         goto error;
1096     }
1097
1098     m = xzalloc(sizeof *m);
1099     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
1100     ovsdb_add_replica(s->remote->server->up.db, &m->replica);
1101     m->session = s;
1102     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1103     m->monitor_id = json_clone(monitor_id);
1104     shash_init(&m->tables);
1105
1106     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1107         const struct ovsdb_table *table;
1108         struct ovsdb_jsonrpc_monitor_table *mt;
1109         size_t allocated_columns;
1110         const struct json *mr_value;
1111         size_t i;
1112
1113         table = ovsdb_get_table(s->remote->server->up.db, node->name);
1114         if (!table) {
1115             error = ovsdb_syntax_error(NULL, NULL,
1116                                        "no table named %s", node->name);
1117             goto error;
1118         }
1119
1120         mt = xzalloc(sizeof *mt);
1121         mt->table = table;
1122         shash_add(&m->tables, table->schema->name, mt);
1123
1124         /* Parse columns. */
1125         mr_value = node->data;
1126         allocated_columns = 0;
1127         if (mr_value->type == JSON_ARRAY) {
1128             const struct json_array *array = &mr_value->u.array;
1129
1130             for (i = 0; i < array->n; i++) {
1131                 error = ovsdb_jsonrpc_parse_monitor_request(
1132                     mt, array->elems[i], &allocated_columns);
1133                 if (error) {
1134                     goto error;
1135                 }
1136             }
1137         } else {
1138             error = ovsdb_jsonrpc_parse_monitor_request(
1139                 mt, mr_value, &allocated_columns);
1140             if (error) {
1141                 goto error;
1142             }
1143         }
1144
1145         /* Check for duplicate columns. */
1146         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
1147               compare_ovsdb_jsonrpc_monitor_column);
1148         for (i = 1; i < mt->n_columns; i++) {
1149             if (mt->columns[i].column == mt->columns[i - 1].column) {
1150                 error = ovsdb_syntax_error(mr_value, NULL, "column %s "
1151                                            "mentioned more than once",
1152                                            mt->columns[i].column->name);
1153                 goto error;
1154             }
1155         }
1156     }
1157
1158     return ovsdb_jsonrpc_monitor_get_initial(m);
1159
1160 error:
1161     if (m) {
1162         ovsdb_remove_replica(s->remote->server->up.db, &m->replica);
1163     }
1164
1165     json = ovsdb_error_to_json(error);
1166     ovsdb_error_destroy(error);
1167     return json;
1168 }
1169
1170 static struct jsonrpc_msg *
1171 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1172                              struct json_array *params,
1173                              const struct json *request_id)
1174 {
1175     if (params->n != 1) {
1176         return jsonrpc_create_error(json_string_create("invalid parameters"),
1177                                     request_id);
1178     } else {
1179         struct ovsdb_jsonrpc_monitor *m;
1180
1181         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1182         if (!m) {
1183             return jsonrpc_create_error(json_string_create("unknown monitor"),
1184                                         request_id);
1185         } else {
1186             ovsdb_remove_replica(s->remote->server->up.db, &m->replica);
1187             return jsonrpc_create_reply(json_object_create(), request_id);
1188         }
1189     }
1190 }
1191
1192 static void
1193 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1194 {
1195     struct ovsdb_jsonrpc_monitor *m, *next;
1196
1197     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1198         ovsdb_remove_replica(s->remote->server->up.db, &m->replica);
1199     }
1200 }
1201
1202 static struct ovsdb_jsonrpc_monitor *
1203 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
1204 {
1205     assert(replica->class == &ovsdb_jsonrpc_replica_class);
1206     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
1207 }
1208
1209 struct ovsdb_jsonrpc_monitor_aux {
1210     bool initial;               /* Sending initial contents of table? */
1211     const struct ovsdb_jsonrpc_monitor *monitor;
1212     struct json *json;          /* JSON for the whole transaction. */
1213
1214     /* Current table.  */
1215     struct ovsdb_jsonrpc_monitor_table *mt;
1216     struct json *table_json;    /* JSON for table's transaction. */
1217 };
1218
1219 static bool
1220 any_reportable_change(const struct ovsdb_jsonrpc_monitor_table *mt,
1221                       const unsigned long int *changed)
1222 {
1223     size_t i;
1224
1225     for (i = 0; i < mt->n_columns; i++) {
1226         const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
1227         unsigned int idx = c->column->index;
1228
1229         if (c->select & OJMS_MODIFY && bitmap_is_set(changed, idx)) {
1230             return true;
1231         }
1232     }
1233
1234     return false;
1235 }
1236
1237 static bool
1238 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
1239                                 const struct ovsdb_row *new,
1240                                 const unsigned long int *changed,
1241                                 void *aux_)
1242 {
1243     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
1244     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
1245     struct ovsdb_table *table = new ? new->table : old->table;
1246     enum ovsdb_jsonrpc_monitor_selection type;
1247     struct json *old_json, *new_json;
1248     struct json *row_json;
1249     char uuid[UUID_LEN + 1];
1250     size_t i;
1251
1252     if (!aux->mt || table != aux->mt->table) {
1253         aux->mt = shash_find_data(&m->tables, table->schema->name);
1254         aux->table_json = NULL;
1255         if (!aux->mt) {
1256             /* We don't care about rows in this table at all.  Tell the caller
1257              * to skip it.  */
1258             return false;
1259         }
1260     }
1261
1262     type = (aux->initial ? OJMS_INITIAL
1263             : !old ? OJMS_INSERT
1264             : !new ? OJMS_DELETE
1265             : OJMS_MODIFY);
1266     if (!(aux->mt->select & type)) {
1267         /* We don't care about this type of change (but do want to be called
1268          * back for changes to other rows in the same table). */
1269         return true;
1270     }
1271
1272     if (type == OJMS_MODIFY && !any_reportable_change(aux->mt, changed)) {
1273         /* Nothing of interest changed. */
1274         return true;
1275     }
1276
1277     old_json = new_json = NULL;
1278     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
1279         old_json = json_object_create();
1280     }
1281     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1282         new_json = json_object_create();
1283     }
1284     for (i = 0; i < aux->mt->n_columns; i++) {
1285         const struct ovsdb_jsonrpc_monitor_column *c = &aux->mt->columns[i];
1286         const struct ovsdb_column *column = c->column;
1287         unsigned int idx = c->column->index;
1288
1289         if (!(type & c->select)) {
1290             /* We don't care about this type of change for this particular
1291              * column (but we will care about it for some other column). */
1292             continue;
1293         }
1294
1295         if ((type == OJMS_MODIFY && bitmap_is_set(changed, idx))
1296             || type == OJMS_DELETE) {
1297             json_object_put(old_json, column->name,
1298                             ovsdb_datum_to_json(&old->fields[idx],
1299                                                 &column->type));
1300         }
1301         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1302             json_object_put(new_json, column->name,
1303                             ovsdb_datum_to_json(&new->fields[idx],
1304                                                 &column->type));
1305         }
1306     }
1307
1308     /* Create JSON object for transaction overall. */
1309     if (!aux->json) {
1310         aux->json = json_object_create();
1311     }
1312
1313     /* Create JSON object for transaction on this table. */
1314     if (!aux->table_json) {
1315         aux->table_json = json_object_create();
1316         json_object_put(aux->json, aux->mt->table->schema->name,
1317                         aux->table_json);
1318     }
1319
1320     /* Create JSON object for transaction on this row. */
1321     row_json = json_object_create();
1322     if (old_json) {
1323         json_object_put(row_json, "old", old_json);
1324     }
1325     if (new_json) {
1326         json_object_put(row_json, "new", new_json);
1327     }
1328
1329     /* Add JSON row to JSON table. */
1330     snprintf(uuid, sizeof uuid,
1331              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
1332     json_object_put(aux->table_json, uuid, row_json);
1333
1334     return true;
1335 }
1336
1337 static void
1338 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
1339                                const struct ovsdb_jsonrpc_monitor *m,
1340                                bool initial)
1341 {
1342     aux->initial = initial;
1343     aux->monitor = m;
1344     aux->json = NULL;
1345     aux->mt = NULL;
1346     aux->table_json = NULL;
1347 }
1348
1349 static struct ovsdb_error *
1350 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
1351                              const struct ovsdb_txn *txn,
1352                              bool durable OVS_UNUSED)
1353 {
1354     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1355     struct ovsdb_jsonrpc_monitor_aux aux;
1356
1357     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
1358     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
1359     if (aux.json) {
1360         struct jsonrpc_msg *msg;
1361         struct json *params;
1362
1363         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
1364                                      aux.json);
1365         msg = jsonrpc_create_notify("update", params);
1366         jsonrpc_session_send(aux.monitor->session->js, msg);
1367     }
1368
1369     return NULL;
1370 }
1371
1372 static struct json *
1373 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
1374 {
1375     struct ovsdb_jsonrpc_monitor_aux aux;
1376     struct shash_node *node;
1377
1378     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
1379     SHASH_FOR_EACH (node, &m->tables) {
1380         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1381
1382         if (mt->select & OJMS_INITIAL) {
1383             struct ovsdb_row *row;
1384
1385             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1386                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
1387             }
1388         }
1389     }
1390     return aux.json ? aux.json : json_object_create();
1391 }
1392
1393 static void
1394 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
1395 {
1396     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1397     struct shash_node *node;
1398
1399     json_destroy(m->monitor_id);
1400     SHASH_FOR_EACH (node, &m->tables) {
1401         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1402         free(mt->columns);
1403         free(mt);
1404     }
1405     shash_destroy(&m->tables);
1406     hmap_remove(&m->session->monitors, &m->node);
1407     free(m);
1408 }
1409
1410 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1411     ovsdb_jsonrpc_monitor_commit,
1412     ovsdb_jsonrpc_monitor_destroy
1413 };